Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
25        admit_sql_ddl_field_nullability_candidate, admit_sql_ddl_field_path_index_candidate,
26        admit_sql_ddl_secondary_index_drop_candidate,
27        canonicalize_strict_sql_literal_for_persisted_kind,
28        derive_sql_ddl_expression_index_accepted_after,
29        derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
30        derive_sql_ddl_field_nullability_accepted_after,
31        derive_sql_ddl_field_path_index_accepted_after,
32        derive_sql_ddl_secondary_index_drop_accepted_after,
33        resolve_sql_ddl_field_drop_dependent_index, resolve_sql_ddl_secondary_index_drop_candidate,
34    },
35    sql::{
36        identifier::identifiers_tail_match,
37        parser::{
38            SqlAlterColumnAction, SqlAlterTableAddColumnStatement,
39            SqlAlterTableAlterColumnStatement, SqlAlterTableDropColumnStatement,
40            SqlAlterTableRenameColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
41            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
42            SqlDropIndexStatement, SqlStatement,
43        },
44    },
45};
46use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
47use thiserror::Error as ThisError;
48
49///
50/// PreparedSqlDdlCommand
51///
52/// Fully prepared SQL DDL command. This is intentionally not executable yet:
53/// it packages the accepted-catalog binding, accepted-after derivation, and
54/// schema mutation admission proof for the future execution boundary.
55///
56#[derive(Clone, Debug, Eq, PartialEq)]
57pub(in crate::db) struct PreparedSqlDdlCommand {
58    bound: BoundSqlDdlRequest,
59    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
60    report: SqlDdlPreparationReport,
61}
62
63impl PreparedSqlDdlCommand {
64    /// Borrow the accepted-catalog-bound DDL request.
65    #[must_use]
66    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
67        &self.bound
68    }
69
70    /// Borrow the accepted-after derivation proof.
71    #[must_use]
72    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
73        self.derivation.as_ref()
74    }
75
76    /// Borrow the developer-facing preparation report.
77    #[must_use]
78    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
79        &self.report
80    }
81
82    /// Return whether this prepared command needs schema or storage mutation.
83    #[must_use]
84    pub(in crate::db) const fn mutates_schema(&self) -> bool {
85        self.derivation.is_some()
86    }
87}
88
89///
90/// SqlDdlPreparationReport
91///
92/// Compact report for a DDL command that has passed all pre-execution
93/// frontend and schema-mutation checks.
94///
95#[derive(Clone, Debug, Eq, PartialEq)]
96pub struct SqlDdlPreparationReport {
97    mutation_kind: SqlDdlMutationKind,
98    target_index: String,
99    target_store: String,
100    field_path: Vec<String>,
101    execution_status: SqlDdlExecutionStatus,
102    rows_scanned: usize,
103    index_keys_written: usize,
104}
105
106impl SqlDdlPreparationReport {
107    /// Return the prepared DDL mutation kind.
108    #[must_use]
109    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
110        self.mutation_kind
111    }
112
113    /// Borrow the target accepted index name.
114    #[must_use]
115    pub const fn target_index(&self) -> &str {
116        self.target_index.as_str()
117    }
118
119    /// Borrow the target accepted index store path.
120    #[must_use]
121    pub const fn target_store(&self) -> &str {
122        self.target_store.as_str()
123    }
124
125    /// Borrow the target field path.
126    #[must_use]
127    pub const fn field_path(&self) -> &[String] {
128        self.field_path.as_slice()
129    }
130
131    /// Return the execution status captured by this DDL report.
132    #[must_use]
133    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
134        self.execution_status
135    }
136
137    /// Return rows scanned by DDL execution.
138    #[must_use]
139    pub const fn rows_scanned(&self) -> usize {
140        self.rows_scanned
141    }
142
143    /// Return index keys written by DDL execution.
144    #[must_use]
145    pub const fn index_keys_written(&self) -> usize {
146        self.index_keys_written
147    }
148
149    pub(in crate::db) const fn with_execution_status(
150        mut self,
151        execution_status: SqlDdlExecutionStatus,
152    ) -> Self {
153        self.execution_status = execution_status;
154        self
155    }
156
157    pub(in crate::db) const fn with_execution_metrics(
158        mut self,
159        rows_scanned: usize,
160        index_keys_written: usize,
161    ) -> Self {
162        self.rows_scanned = rows_scanned;
163        self.index_keys_written = index_keys_written;
164        self
165    }
166}
167
168///
169/// SqlDdlMutationKind
170///
171/// Developer-facing SQL DDL mutation kind.
172///
173#[derive(Clone, Copy, Debug, Eq, PartialEq)]
174pub enum SqlDdlMutationKind {
175    AddDefaultedField,
176    AddNullableField,
177    SetFieldDefault,
178    DropFieldDefault,
179    SetFieldNotNull,
180    DropFieldNotNull,
181    DropField,
182    RenameField,
183    AddFieldPathIndex,
184    AddExpressionIndex,
185    DropSecondaryIndex,
186}
187
188impl SqlDdlMutationKind {
189    /// Return the stable diagnostic label for this DDL mutation kind.
190    #[must_use]
191    pub const fn as_str(self) -> &'static str {
192        match self {
193            Self::AddDefaultedField => "add_defaulted_field",
194            Self::AddNullableField => "add_nullable_field",
195            Self::SetFieldDefault => "set_field_default",
196            Self::DropFieldDefault => "drop_field_default",
197            Self::SetFieldNotNull => "set_field_not_null",
198            Self::DropFieldNotNull => "drop_field_not_null",
199            Self::DropField => "drop_field",
200            Self::RenameField => "rename_field",
201            Self::AddFieldPathIndex => "add_field_path_index",
202            Self::AddExpressionIndex => "add_expression_index",
203            Self::DropSecondaryIndex => "drop_secondary_index",
204        }
205    }
206}
207
208///
209/// SqlDdlExecutionStatus
210///
211/// SQL DDL execution state at the current boundary.
212///
213#[derive(Clone, Copy, Debug, Eq, PartialEq)]
214pub enum SqlDdlExecutionStatus {
215    PreparedOnly,
216    Published,
217    NoOp,
218}
219
220impl SqlDdlExecutionStatus {
221    /// Return the stable diagnostic label for this execution status.
222    #[must_use]
223    pub const fn as_str(self) -> &'static str {
224        match self {
225            Self::PreparedOnly => "prepared_only",
226            Self::Published => "published",
227            Self::NoOp => "no_op",
228        }
229    }
230}
231
232///
233/// BoundSqlDdlRequest
234///
235/// Accepted-catalog SQL DDL request after parser syntax has been resolved
236/// against one runtime schema snapshot.
237///
238#[derive(Clone, Debug, Eq, PartialEq)]
239pub(in crate::db) struct BoundSqlDdlRequest {
240    statement: BoundSqlDdlStatement,
241}
242
243impl BoundSqlDdlRequest {
244    /// Borrow the bound statement payload.
245    #[must_use]
246    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
247        &self.statement
248    }
249}
250
251///
252/// BoundSqlDdlStatement
253///
254/// Catalog-resolved DDL statement vocabulary.
255///
256#[derive(Clone, Debug, Eq, PartialEq)]
257pub(in crate::db) enum BoundSqlDdlStatement {
258    AddColumn(BoundSqlAddColumnRequest),
259    AlterColumnDefault(BoundSqlAlterColumnDefaultRequest),
260    AlterColumnNullability(BoundSqlAlterColumnNullabilityRequest),
261    CreateIndex(BoundSqlCreateIndexRequest),
262    DropIndex(BoundSqlDropIndexRequest),
263    NoOp(BoundSqlDdlNoOpRequest),
264}
265
266///
267/// BoundSqlAddColumnRequest
268///
269/// Catalog-resolved additive field DDL request.
270///
271#[derive(Clone, Debug, Eq, PartialEq)]
272pub(in crate::db) struct BoundSqlAddColumnRequest {
273    entity_name: String,
274    field: PersistedFieldSnapshot,
275}
276
277impl BoundSqlAddColumnRequest {
278    /// Borrow the accepted entity name.
279    #[must_use]
280    pub(in crate::db) const fn entity_name(&self) -> &str {
281        self.entity_name.as_str()
282    }
283
284    /// Borrow the accepted DDL-owned field snapshot to publish.
285    #[must_use]
286    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
287        &self.field
288    }
289}
290
291///
292/// BoundSqlAlterColumnDefaultRequest
293///
294/// Catalog-resolved field-default metadata DDL request.
295///
296#[derive(Clone, Debug, Eq, PartialEq)]
297pub(in crate::db) struct BoundSqlAlterColumnDefaultRequest {
298    entity_name: String,
299    field: PersistedFieldSnapshot,
300    default: SchemaFieldDefault,
301    mutation_kind: SqlDdlMutationKind,
302}
303
304impl BoundSqlAlterColumnDefaultRequest {
305    /// Borrow the accepted entity name.
306    #[must_use]
307    pub(in crate::db) const fn entity_name(&self) -> &str {
308        self.entity_name.as_str()
309    }
310
311    /// Borrow the accepted field name.
312    #[must_use]
313    pub(in crate::db) const fn field_name(&self) -> &str {
314        self.field.name()
315    }
316
317    /// Borrow the accepted field whose default will change.
318    #[must_use]
319    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
320        &self.field
321    }
322
323    /// Borrow the default contract to publish.
324    #[must_use]
325    pub(in crate::db) const fn default(&self) -> &SchemaFieldDefault {
326        &self.default
327    }
328
329    /// Return the field-default mutation kind.
330    #[must_use]
331    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
332        self.mutation_kind
333    }
334}
335
336///
337/// BoundSqlAlterColumnNullabilityRequest
338///
339/// Catalog-resolved field-nullability metadata DDL request.
340///
341#[derive(Clone, Debug, Eq, PartialEq)]
342pub(in crate::db) struct BoundSqlAlterColumnNullabilityRequest {
343    entity_name: String,
344    field: PersistedFieldSnapshot,
345    nullable: bool,
346    mutation_kind: SqlDdlMutationKind,
347}
348
349impl BoundSqlAlterColumnNullabilityRequest {
350    /// Borrow the accepted entity name.
351    #[must_use]
352    pub(in crate::db) const fn entity_name(&self) -> &str {
353        self.entity_name.as_str()
354    }
355
356    /// Borrow the accepted field name.
357    #[must_use]
358    pub(in crate::db) const fn field_name(&self) -> &str {
359        self.field.name()
360    }
361
362    /// Borrow the accepted field whose nullability will change.
363    #[must_use]
364    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
365        &self.field
366    }
367
368    /// Return the nullable contract to publish.
369    #[must_use]
370    pub(in crate::db) const fn nullable(&self) -> bool {
371        self.nullable
372    }
373
374    /// Return the field-nullability mutation kind.
375    #[must_use]
376    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
377        self.mutation_kind
378    }
379}
380
381///
382/// BoundSqlDdlNoOpRequest
383///
384/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
385///
386#[derive(Clone, Debug, Eq, PartialEq)]
387pub(in crate::db) struct BoundSqlDdlNoOpRequest {
388    mutation_kind: SqlDdlMutationKind,
389    index_name: String,
390    entity_name: String,
391    target_store: String,
392    field_path: Vec<String>,
393}
394
395impl BoundSqlDdlNoOpRequest {
396    /// Return the user-facing mutation family this no-op belongs to.
397    #[must_use]
398    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
399        self.mutation_kind
400    }
401
402    /// Borrow the requested index name.
403    #[must_use]
404    pub(in crate::db) const fn index_name(&self) -> &str {
405        self.index_name.as_str()
406    }
407
408    /// Borrow the accepted entity name that owns this request.
409    #[must_use]
410    pub(in crate::db) const fn entity_name(&self) -> &str {
411        self.entity_name.as_str()
412    }
413
414    /// Borrow the accepted index store path, or `-` when no target exists.
415    #[must_use]
416    pub(in crate::db) const fn target_store(&self) -> &str {
417        self.target_store.as_str()
418    }
419
420    /// Borrow the target field path, empty when no target exists.
421    #[must_use]
422    pub(in crate::db) const fn field_path(&self) -> &[String] {
423        self.field_path.as_slice()
424    }
425}
426
427///
428/// BoundSqlCreateIndexRequest
429///
430/// Catalog-resolved request for adding one secondary index.
431///
432#[derive(Clone, Debug, Eq, PartialEq)]
433pub(in crate::db) struct BoundSqlCreateIndexRequest {
434    index_name: String,
435    entity_name: String,
436    key_items: Vec<BoundSqlDdlCreateIndexKey>,
437    field_paths: Vec<BoundSqlDdlFieldPath>,
438    candidate_index: PersistedIndexSnapshot,
439}
440
441impl BoundSqlCreateIndexRequest {
442    /// Borrow the requested index name.
443    #[must_use]
444    pub(in crate::db) const fn index_name(&self) -> &str {
445        self.index_name.as_str()
446    }
447
448    /// Borrow the accepted entity name that owns this request.
449    #[must_use]
450    pub(in crate::db) const fn entity_name(&self) -> &str {
451        self.entity_name.as_str()
452    }
453
454    /// Borrow the accepted field-path targets.
455    #[must_use]
456    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
457        self.field_paths.as_slice()
458    }
459
460    /// Borrow the accepted key targets in DDL key order.
461    #[must_use]
462    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
463        self.key_items.as_slice()
464    }
465
466    /// Borrow the candidate accepted index snapshot for mutation admission.
467    #[must_use]
468    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
469        &self.candidate_index
470    }
471}
472
473///
474/// BoundSqlDropIndexRequest
475///
476/// Catalog-resolved request for dropping one DDL-published secondary index.
477///
478#[derive(Clone, Debug, Eq, PartialEq)]
479pub(in crate::db) struct BoundSqlDropIndexRequest {
480    index_name: String,
481    entity_name: String,
482    dropped_index: PersistedIndexSnapshot,
483    field_path: Vec<String>,
484}
485
486impl BoundSqlDropIndexRequest {
487    /// Borrow the requested index name.
488    #[must_use]
489    pub(in crate::db) const fn index_name(&self) -> &str {
490        self.index_name.as_str()
491    }
492
493    /// Borrow the accepted entity name that owns this request.
494    #[must_use]
495    pub(in crate::db) const fn entity_name(&self) -> &str {
496        self.entity_name.as_str()
497    }
498
499    /// Borrow the accepted index snapshot that will be removed.
500    #[must_use]
501    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
502        &self.dropped_index
503    }
504
505    /// Borrow the dropped field-path target.
506    #[must_use]
507    pub(in crate::db) const fn field_path(&self) -> &[String] {
508        self.field_path.as_slice()
509    }
510}
511
512///
513/// BoundSqlDdlFieldPath
514///
515/// Accepted field-path target for SQL DDL binding.
516///
517#[derive(Clone, Debug, Eq, PartialEq)]
518pub(in crate::db) struct BoundSqlDdlFieldPath {
519    root: String,
520    segments: Vec<String>,
521    accepted_path: Vec<String>,
522}
523
524impl BoundSqlDdlFieldPath {
525    /// Borrow the top-level field name.
526    #[must_use]
527    pub(in crate::db) const fn root(&self) -> &str {
528        self.root.as_str()
529    }
530
531    /// Borrow nested path segments below the top-level field.
532    #[must_use]
533    pub(in crate::db) const fn segments(&self) -> &[String] {
534        self.segments.as_slice()
535    }
536
537    /// Borrow the full accepted field path used by index metadata.
538    #[must_use]
539    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
540        self.accepted_path.as_slice()
541    }
542}
543
544///
545/// SqlDdlBindError
546///
547/// Typed fail-closed reasons for SQL DDL catalog binding.
548///
549#[derive(Debug, Eq, PartialEq, ThisError)]
550pub(in crate::db) enum SqlDdlBindError {
551    #[error("SQL DDL binder requires a DDL statement")]
552    NotDdl,
553
554    #[error("accepted schema does not expose an entity name")]
555    MissingEntityName,
556
557    #[error("accepted schema does not expose an entity path")]
558    MissingEntityPath,
559
560    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
561    EntityMismatch {
562        sql_entity: String,
563        expected_entity: String,
564    },
565
566    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
567    UnknownFieldPath {
568        entity_name: String,
569        field_path: String,
570    },
571
572    #[error("field path '{field_path}' is not indexable")]
573    FieldPathNotIndexable { field_path: String },
574
575    #[error("field path '{field_path}' depends on generated-only metadata")]
576    FieldPathNotAcceptedCatalogBacked { field_path: String },
577
578    #[error("invalid filtered index predicate: {detail}")]
579    InvalidFilteredIndexPredicate { detail: String },
580
581    #[error("index name '{index_name}' already exists in the accepted schema")]
582    DuplicateIndexName { index_name: String },
583
584    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
585    DuplicateFieldPathIndex {
586        field_path: String,
587        existing_index: String,
588    },
589
590    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
591    UnknownIndex {
592        entity_name: String,
593        index_name: String,
594    },
595
596    #[error(
597        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
598    )]
599    GeneratedIndexDropRejected { index_name: String },
600
601    #[error(
602        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
603    )]
604    UnsupportedDropIndex { index_name: String },
605
606    #[error(
607        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
608    )]
609    UnsupportedAlterTableAddColumn {
610        entity_name: String,
611        column_name: String,
612    },
613
614    #[error(
615        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
616    )]
617    InvalidAlterTableAddColumnDefault {
618        entity_name: String,
619        column_name: String,
620        detail: String,
621    },
622
623    #[error(
624        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
625    )]
626    UnsupportedAlterTableAddColumnNotNull {
627        entity_name: String,
628        column_name: String,
629    },
630
631    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
632    DuplicateColumn {
633        entity_name: String,
634        column_name: String,
635    },
636
637    #[error(
638        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
639    )]
640    UnsupportedAlterTableAddColumnType {
641        entity_name: String,
642        column_name: String,
643        column_type: String,
644    },
645
646    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
647    UnknownColumn {
648        entity_name: String,
649        column_name: String,
650    },
651
652    #[error(
653        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
654    )]
655    UnsupportedAlterTableAlterColumn {
656        entity_name: String,
657        column_name: String,
658        action: String,
659    },
660
661    #[error(
662        "SQL DDL ALTER TABLE ALTER COLUMN SET DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
663    )]
664    InvalidAlterTableAlterColumnDefault {
665        entity_name: String,
666        column_name: String,
667        detail: String,
668    },
669
670    #[error(
671        "SQL DDL ALTER TABLE ALTER COLUMN DROP DEFAULT is not executable yet for required accepted entity '{entity_name}' column '{column_name}'"
672    )]
673    UnsupportedAlterTableDropDefaultRequired {
674        entity_name: String,
675        column_name: String,
676    },
677
678    #[error(
679        "SQL DDL ALTER TABLE ALTER COLUMN DEFAULT cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema default instead"
680    )]
681    GeneratedFieldDefaultChangeRejected {
682        entity_name: String,
683        column_name: String,
684    },
685
686    #[error(
687        "SQL DDL ALTER TABLE ALTER COLUMN NULLABILITY cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema nullability instead"
688    )]
689    GeneratedFieldNullabilityChangeRejected {
690        entity_name: String,
691        column_name: String,
692    },
693
694    #[error(
695        "SQL DDL ALTER TABLE DROP COLUMN cannot drop primary-key field '{column_name}' on entity '{entity_name}'"
696    )]
697    PrimaryKeyFieldDropRejected {
698        entity_name: String,
699        column_name: String,
700    },
701
702    #[error(
703        "SQL DDL ALTER TABLE DROP COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; remove the field from the Rust schema instead"
704    )]
705    GeneratedFieldDropRejected {
706        entity_name: String,
707        column_name: String,
708    },
709
710    #[error(
711        "SQL DDL ALTER TABLE DROP COLUMN cannot drop accepted field '{column_name}' on entity '{entity_name}' while index '{index_name}' depends on it; drop dependent DDL-owned indexes first"
712    )]
713    IndexedFieldDropRejected {
714        entity_name: String,
715        column_name: String,
716        index_name: String,
717    },
718
719    #[error(
720        "SQL DDL ALTER TABLE DROP COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'; retained-slot field removal policy is not enabled yet"
721    )]
722    UnsupportedAlterTableDropColumn {
723        entity_name: String,
724        column_name: String,
725    },
726
727    #[error(
728        "SQL DDL ALTER TABLE RENAME COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; rename the field in the Rust schema instead"
729    )]
730    GeneratedFieldRenameRejected {
731        entity_name: String,
732        column_name: String,
733    },
734
735    #[error(
736        "SQL DDL ALTER TABLE RENAME COLUMN is not executable yet for accepted entity '{entity_name}' column '{old_column_name}' to '{new_column_name}'; field rename metadata policy is not enabled yet"
737    )]
738    UnsupportedAlterTableRenameColumn {
739        entity_name: String,
740        old_column_name: String,
741        new_column_name: String,
742    },
743}
744
745///
746/// SqlDdlLoweringError
747///
748/// Typed fail-closed reasons while lowering bound DDL into schema mutation
749/// admission.
750///
751#[derive(Debug, Eq, PartialEq, ThisError)]
752pub(in crate::db) enum SqlDdlLoweringError {
753    #[error("SQL DDL lowering requires a supported DDL statement")]
754    UnsupportedStatement,
755
756    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
757    MutationAdmission(SchemaDdlMutationAdmissionError),
758}
759
760///
761/// SqlDdlPrepareError
762///
763/// Typed fail-closed preparation errors for SQL DDL.
764///
765#[derive(Debug, Eq, PartialEq, ThisError)]
766pub(in crate::db) enum SqlDdlPrepareError {
767    #[error("{0}")]
768    Bind(#[from] SqlDdlBindError),
769
770    #[error("{0}")]
771    Lowering(#[from] SqlDdlLoweringError),
772}
773
774/// Prepare one parsed SQL DDL statement through every pre-execution proof.
775pub(in crate::db) fn prepare_sql_ddl_statement(
776    statement: &SqlStatement,
777    accepted_before: &AcceptedSchemaSnapshot,
778    schema: &SchemaInfo,
779    index_store_path: &'static str,
780) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
781    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
782    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
783        None
784    } else {
785        Some(derive_bound_sql_ddl_accepted_after(
786            accepted_before,
787            &bound,
788        )?)
789    };
790    let report = ddl_preparation_report(&bound);
791
792    Ok(PreparedSqlDdlCommand {
793        bound,
794        derivation,
795        report,
796    })
797}
798
799/// Bind one parsed SQL DDL statement against accepted catalog metadata.
800pub(in crate::db) fn bind_sql_ddl_statement(
801    statement: &SqlStatement,
802    accepted_before: &AcceptedSchemaSnapshot,
803    schema: &SchemaInfo,
804    index_store_path: &'static str,
805) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
806    let SqlStatement::Ddl(ddl) = statement else {
807        return Err(SqlDdlBindError::NotDdl);
808    };
809
810    match ddl {
811        SqlDdlStatement::CreateIndex(statement) => {
812            bind_create_index_statement(statement, schema, index_store_path)
813        }
814        SqlDdlStatement::DropIndex(statement) => {
815            bind_drop_index_statement(statement, accepted_before, schema)
816        }
817        SqlDdlStatement::AlterTableAddColumn(statement) => {
818            bind_alter_table_add_column_statement(statement, accepted_before, schema)
819        }
820        SqlDdlStatement::AlterTableAlterColumn(statement) => {
821            bind_alter_table_alter_column_statement(statement, accepted_before, schema)
822        }
823        SqlDdlStatement::AlterTableDropColumn(statement) => {
824            bind_alter_table_drop_column_statement(statement, accepted_before, schema)
825        }
826        SqlDdlStatement::AlterTableRenameColumn(statement) => {
827            bind_alter_table_rename_column_statement(statement, accepted_before, schema)
828        }
829    }
830}
831
832fn bind_create_index_statement(
833    statement: &SqlCreateIndexStatement,
834    schema: &SchemaInfo,
835    index_store_path: &'static str,
836) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
837    let entity_name = schema
838        .entity_name()
839        .ok_or(SqlDdlBindError::MissingEntityName)?;
840
841    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
842        return Err(SqlDdlBindError::EntityMismatch {
843            sql_entity: statement.entity.clone(),
844            expected_entity: entity_name.to_string(),
845        });
846    }
847
848    let key_items = statement
849        .key_items
850        .iter()
851        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
852        .collect::<Result<Vec<_>, _>>()?;
853    let field_paths = create_index_field_path_report_items(key_items.as_slice());
854    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
855        if key_items_are_field_path_only(key_items.as_slice())
856            && statement.if_not_exists
857            && existing_field_path_index_matches_request(
858                existing_index,
859                field_paths.as_slice(),
860                statement.predicate_sql.as_deref(),
861                statement.uniqueness,
862            )
863        {
864            return Ok(BoundSqlDdlRequest {
865                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
866                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
867                    index_name: statement.name.clone(),
868                    entity_name: entity_name.to_string(),
869                    target_store: existing_index.store().to_string(),
870                    field_path: ddl_field_path_report(field_paths.as_slice()),
871                }),
872            });
873        }
874
875        return Err(SqlDdlBindError::DuplicateIndexName {
876            index_name: statement.name.clone(),
877        });
878    }
879    let predicate_sql =
880        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
881    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
882        if statement.if_not_exists
883            && existing_expression_index_matches_request(
884                existing_index,
885                key_items.as_slice(),
886                predicate_sql.as_deref(),
887                statement.uniqueness,
888            )
889        {
890            return Ok(BoundSqlDdlRequest {
891                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
892                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
893                    index_name: statement.name.clone(),
894                    entity_name: entity_name.to_string(),
895                    target_store: existing_index.store().to_string(),
896                    field_path: ddl_key_item_report(key_items.as_slice()),
897                }),
898            });
899        }
900
901        return Err(SqlDdlBindError::DuplicateIndexName {
902            index_name: statement.name.clone(),
903        });
904    }
905    if key_items_are_field_path_only(key_items.as_slice()) {
906        reject_duplicate_field_path_index(
907            field_paths.as_slice(),
908            predicate_sql.as_deref(),
909            schema,
910        )?;
911    } else {
912        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
913    }
914    let candidate_index = candidate_index_snapshot(
915        statement.name.as_str(),
916        key_items.as_slice(),
917        predicate_sql.as_deref(),
918        statement.uniqueness,
919        schema,
920        index_store_path,
921    )?;
922
923    Ok(BoundSqlDdlRequest {
924        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
925            index_name: statement.name.clone(),
926            entity_name: entity_name.to_string(),
927            key_items,
928            field_paths,
929            candidate_index,
930        }),
931    })
932}
933
934fn bind_drop_index_statement(
935    statement: &SqlDropIndexStatement,
936    accepted_before: &AcceptedSchemaSnapshot,
937    schema: &SchemaInfo,
938) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
939    let entity_name = schema
940        .entity_name()
941        .ok_or(SqlDdlBindError::MissingEntityName)?;
942
943    if let Some(sql_entity) = statement.entity.as_deref()
944        && !identifiers_tail_match(sql_entity, entity_name)
945    {
946        return Err(SqlDdlBindError::EntityMismatch {
947            sql_entity: sql_entity.to_string(),
948            expected_entity: entity_name.to_string(),
949        });
950    }
951    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
952        accepted_before,
953        &statement.name,
954    )
955    .map_err(|error| match error {
956        SchemaDdlIndexDropCandidateError::Generated => {
957            SqlDdlBindError::GeneratedIndexDropRejected {
958                index_name: statement.name.clone(),
959            }
960        }
961        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
962            entity_name: entity_name.to_string(),
963            index_name: statement.name.clone(),
964        },
965        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
966            index_name: statement.name.clone(),
967        },
968    });
969    let (dropped_index, field_path) = match drop_candidate {
970        Ok((dropped_index, field_path)) => (dropped_index, field_path),
971        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
972            return Ok(BoundSqlDdlRequest {
973                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
974                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
975                    index_name: statement.name.clone(),
976                    entity_name: entity_name.to_string(),
977                    target_store: "-".to_string(),
978                    field_path: Vec::new(),
979                }),
980            });
981        }
982        Err(error) => return Err(error),
983    };
984    Ok(BoundSqlDdlRequest {
985        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
986            index_name: statement.name.clone(),
987            entity_name: entity_name.to_string(),
988            dropped_index,
989            field_path,
990        }),
991    })
992}
993
994fn bind_alter_table_add_column_statement(
995    statement: &SqlAlterTableAddColumnStatement,
996    accepted_before: &AcceptedSchemaSnapshot,
997    schema: &SchemaInfo,
998) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
999    let entity_name = schema
1000        .entity_name()
1001        .ok_or(SqlDdlBindError::MissingEntityName)?;
1002
1003    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1004        return Err(SqlDdlBindError::EntityMismatch {
1005            sql_entity: statement.entity.clone(),
1006            expected_entity: entity_name.to_string(),
1007        });
1008    }
1009
1010    if schema
1011        .field_nullable(statement.column_name.as_str())
1012        .is_some()
1013    {
1014        return Err(SqlDdlBindError::DuplicateColumn {
1015            entity_name: entity_name.to_string(),
1016            column_name: statement.column_name.clone(),
1017        });
1018    }
1019
1020    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
1021        statement.column_type.as_str(),
1022    )
1023    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
1024        entity_name: entity_name.to_string(),
1025        column_name: statement.column_name.clone(),
1026        column_type: statement.column_type.clone(),
1027    })?;
1028    let default = schema_field_default_for_sql_default(
1029        entity_name,
1030        statement.column_name.as_str(),
1031        statement.default.as_ref(),
1032        &kind,
1033        statement.nullable,
1034        storage_decode,
1035        leaf_codec,
1036    )?;
1037    if !statement.nullable && default.is_none() {
1038        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
1039            entity_name: entity_name.to_string(),
1040            column_name: statement.column_name.clone(),
1041        });
1042    }
1043    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
1044        next_sql_ddl_field_id(accepted_before),
1045        statement.column_name.clone(),
1046        next_sql_ddl_field_slot(accepted_before),
1047        kind,
1048        Vec::new(),
1049        statement.nullable,
1050        default,
1051        SchemaFieldWritePolicy::from_model_policies(None, None),
1052        PersistedFieldOrigin::SqlDdl,
1053        storage_decode,
1054        leaf_codec,
1055    );
1056
1057    Ok(BoundSqlDdlRequest {
1058        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
1059            entity_name: entity_name.to_string(),
1060            field,
1061        }),
1062    })
1063}
1064
1065fn alter_table_alter_column_bind_error(
1066    statement: &SqlAlterTableAlterColumnStatement,
1067    schema: &SchemaInfo,
1068) -> SqlDdlBindError {
1069    let Some(entity_name) = schema.entity_name() else {
1070        return SqlDdlBindError::MissingEntityName;
1071    };
1072
1073    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1074        return SqlDdlBindError::EntityMismatch {
1075            sql_entity: statement.entity.clone(),
1076            expected_entity: entity_name.to_string(),
1077        };
1078    }
1079
1080    if schema
1081        .field_nullable(statement.column_name.as_str())
1082        .is_none()
1083    {
1084        return SqlDdlBindError::UnknownColumn {
1085            entity_name: entity_name.to_string(),
1086            column_name: statement.column_name.clone(),
1087        };
1088    }
1089
1090    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
1091        entity_name: entity_name.to_string(),
1092        column_name: statement.column_name.clone(),
1093        action: statement.action.label().to_string(),
1094    }
1095}
1096
1097fn bind_alter_table_alter_column_statement(
1098    statement: &SqlAlterTableAlterColumnStatement,
1099    accepted_before: &AcceptedSchemaSnapshot,
1100    schema: &SchemaInfo,
1101) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1102    let Some(entity_name) = schema.entity_name() else {
1103        return Err(SqlDdlBindError::MissingEntityName);
1104    };
1105
1106    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1107        return Err(SqlDdlBindError::EntityMismatch {
1108            sql_entity: statement.entity.clone(),
1109            expected_entity: entity_name.to_string(),
1110        });
1111    }
1112
1113    let field = accepted_before
1114        .persisted_snapshot()
1115        .fields()
1116        .iter()
1117        .find(|field| field.name() == statement.column_name)
1118        .ok_or_else(|| SqlDdlBindError::UnknownColumn {
1119            entity_name: entity_name.to_string(),
1120            column_name: statement.column_name.clone(),
1121        })?;
1122
1123    match &statement.action {
1124        SqlAlterColumnAction::SetDefault(default) => {
1125            reject_generated_field_default_change(entity_name, field)?;
1126            let default =
1127                schema_field_default_for_alter_column_default(entity_name, field, default)?;
1128            Ok(bind_alter_table_alter_column_default(
1129                entity_name,
1130                field,
1131                default,
1132                SqlDdlMutationKind::SetFieldDefault,
1133            ))
1134        }
1135        SqlAlterColumnAction::DropDefault => {
1136            if !field.default().is_none() {
1137                reject_generated_field_default_change(entity_name, field)?;
1138            }
1139            if !field.nullable() && !field.default().is_none() {
1140                return Err(SqlDdlBindError::UnsupportedAlterTableDropDefaultRequired {
1141                    entity_name: entity_name.to_string(),
1142                    column_name: statement.column_name.clone(),
1143                });
1144            }
1145            Ok(bind_alter_table_alter_column_default(
1146                entity_name,
1147                field,
1148                SchemaFieldDefault::None,
1149                SqlDdlMutationKind::DropFieldDefault,
1150            ))
1151        }
1152        SqlAlterColumnAction::SetNotNull => Ok(bind_alter_table_alter_column_nullability(
1153            entity_name,
1154            field,
1155            false,
1156            SqlDdlMutationKind::SetFieldNotNull,
1157        )?),
1158        SqlAlterColumnAction::DropNotNull => Ok(bind_alter_table_alter_column_nullability(
1159            entity_name,
1160            field,
1161            true,
1162            SqlDdlMutationKind::DropFieldNotNull,
1163        )?),
1164    }
1165}
1166
1167fn bind_alter_table_drop_column_statement(
1168    statement: &SqlAlterTableDropColumnStatement,
1169    accepted_before: &AcceptedSchemaSnapshot,
1170    schema: &SchemaInfo,
1171) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1172    let entity_name = schema
1173        .entity_name()
1174        .ok_or(SqlDdlBindError::MissingEntityName)?;
1175
1176    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1177        return Err(SqlDdlBindError::EntityMismatch {
1178            sql_entity: statement.entity.clone(),
1179            expected_entity: entity_name.to_string(),
1180        });
1181    }
1182
1183    let accepted = accepted_before.persisted_snapshot();
1184    let Some(field) = accepted
1185        .fields()
1186        .iter()
1187        .find(|field| field.name() == statement.column_name)
1188    else {
1189        if statement.if_exists {
1190            return Ok(BoundSqlDdlRequest {
1191                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1192                    mutation_kind: SqlDdlMutationKind::DropField,
1193                    index_name: statement.column_name.clone(),
1194                    entity_name: entity_name.to_string(),
1195                    target_store: "-".to_string(),
1196                    field_path: vec![statement.column_name.clone()],
1197                }),
1198            });
1199        }
1200
1201        return Err(SqlDdlBindError::UnknownColumn {
1202            entity_name: entity_name.to_string(),
1203            column_name: statement.column_name.clone(),
1204        });
1205    };
1206
1207    if accepted.primary_key_field_id() == field.id() {
1208        return Err(SqlDdlBindError::PrimaryKeyFieldDropRejected {
1209            entity_name: entity_name.to_string(),
1210            column_name: statement.column_name.clone(),
1211        });
1212    }
1213
1214    if field.generated() {
1215        return Err(SqlDdlBindError::GeneratedFieldDropRejected {
1216            entity_name: entity_name.to_string(),
1217            column_name: statement.column_name.clone(),
1218        });
1219    }
1220
1221    if let Some(index_name) =
1222        resolve_sql_ddl_field_drop_dependent_index(accepted_before, field.id())
1223    {
1224        return Err(SqlDdlBindError::IndexedFieldDropRejected {
1225            entity_name: entity_name.to_string(),
1226            column_name: statement.column_name.clone(),
1227            index_name,
1228        });
1229    }
1230
1231    Err(SqlDdlBindError::UnsupportedAlterTableDropColumn {
1232        entity_name: entity_name.to_string(),
1233        column_name: statement.column_name.clone(),
1234    })
1235}
1236
1237fn bind_alter_table_rename_column_statement(
1238    statement: &SqlAlterTableRenameColumnStatement,
1239    accepted_before: &AcceptedSchemaSnapshot,
1240    schema: &SchemaInfo,
1241) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1242    let entity_name = schema
1243        .entity_name()
1244        .ok_or(SqlDdlBindError::MissingEntityName)?;
1245
1246    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1247        return Err(SqlDdlBindError::EntityMismatch {
1248            sql_entity: statement.entity.clone(),
1249            expected_entity: entity_name.to_string(),
1250        });
1251    }
1252
1253    let accepted = accepted_before.persisted_snapshot();
1254    let Some(field) = accepted
1255        .fields()
1256        .iter()
1257        .find(|field| field.name() == statement.old_column_name)
1258    else {
1259        return Err(SqlDdlBindError::UnknownColumn {
1260            entity_name: entity_name.to_string(),
1261            column_name: statement.old_column_name.clone(),
1262        });
1263    };
1264
1265    if statement.old_column_name == statement.new_column_name {
1266        return Ok(BoundSqlDdlRequest {
1267            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1268                mutation_kind: SqlDdlMutationKind::RenameField,
1269                index_name: statement.old_column_name.clone(),
1270                entity_name: entity_name.to_string(),
1271                target_store: "-".to_string(),
1272                field_path: vec![statement.old_column_name.clone()],
1273            }),
1274        });
1275    }
1276
1277    if accepted
1278        .fields()
1279        .iter()
1280        .any(|field| field.name() == statement.new_column_name)
1281    {
1282        return Err(SqlDdlBindError::DuplicateColumn {
1283            entity_name: entity_name.to_string(),
1284            column_name: statement.new_column_name.clone(),
1285        });
1286    }
1287
1288    if field.generated() {
1289        return Err(SqlDdlBindError::GeneratedFieldRenameRejected {
1290            entity_name: entity_name.to_string(),
1291            column_name: statement.old_column_name.clone(),
1292        });
1293    }
1294
1295    Err(SqlDdlBindError::UnsupportedAlterTableRenameColumn {
1296        entity_name: entity_name.to_string(),
1297        old_column_name: statement.old_column_name.clone(),
1298        new_column_name: statement.new_column_name.clone(),
1299    })
1300}
1301
1302fn bind_alter_table_alter_column_default(
1303    entity_name: &str,
1304    field: &PersistedFieldSnapshot,
1305    default: SchemaFieldDefault,
1306    mutation_kind: SqlDdlMutationKind,
1307) -> BoundSqlDdlRequest {
1308    if field.default() == &default {
1309        return BoundSqlDdlRequest {
1310            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1311                mutation_kind,
1312                index_name: field.name().to_string(),
1313                entity_name: entity_name.to_string(),
1314                target_store: entity_name.to_string(),
1315                field_path: vec![field.name().to_string()],
1316            }),
1317        };
1318    }
1319
1320    BoundSqlDdlRequest {
1321        statement: BoundSqlDdlStatement::AlterColumnDefault(BoundSqlAlterColumnDefaultRequest {
1322            entity_name: entity_name.to_string(),
1323            field: field.clone(),
1324            default,
1325            mutation_kind,
1326        }),
1327    }
1328}
1329
1330fn reject_generated_field_default_change(
1331    entity_name: &str,
1332    field: &PersistedFieldSnapshot,
1333) -> Result<(), SqlDdlBindError> {
1334    if field.generated() {
1335        return Err(SqlDdlBindError::GeneratedFieldDefaultChangeRejected {
1336            entity_name: entity_name.to_string(),
1337            column_name: field.name().to_string(),
1338        });
1339    }
1340
1341    Ok(())
1342}
1343
1344fn bind_alter_table_alter_column_nullability(
1345    entity_name: &str,
1346    field: &PersistedFieldSnapshot,
1347    nullable: bool,
1348    mutation_kind: SqlDdlMutationKind,
1349) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1350    if field.nullable() == nullable {
1351        return Ok(BoundSqlDdlRequest {
1352            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1353                mutation_kind,
1354                index_name: field.name().to_string(),
1355                entity_name: entity_name.to_string(),
1356                target_store: entity_name.to_string(),
1357                field_path: vec![field.name().to_string()],
1358            }),
1359        });
1360    }
1361
1362    reject_generated_field_nullability_change(entity_name, field)?;
1363
1364    Ok(BoundSqlDdlRequest {
1365        statement: BoundSqlDdlStatement::AlterColumnNullability(
1366            BoundSqlAlterColumnNullabilityRequest {
1367                entity_name: entity_name.to_string(),
1368                field: field.clone(),
1369                nullable,
1370                mutation_kind,
1371            },
1372        ),
1373    })
1374}
1375
1376fn reject_generated_field_nullability_change(
1377    entity_name: &str,
1378    field: &PersistedFieldSnapshot,
1379) -> Result<(), SqlDdlBindError> {
1380    if field.generated() {
1381        return Err(SqlDdlBindError::GeneratedFieldNullabilityChangeRejected {
1382            entity_name: entity_name.to_string(),
1383            column_name: field.name().to_string(),
1384        });
1385    }
1386
1387    Ok(())
1388}
1389
1390fn schema_field_default_for_sql_default(
1391    entity_name: &str,
1392    column_name: &str,
1393    default: Option<&crate::value::Value>,
1394    kind: &PersistedFieldKind,
1395    nullable: bool,
1396    storage_decode: FieldStorageDecode,
1397    leaf_codec: LeafCodec,
1398) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1399    let Some(default) = default else {
1400        return Ok(SchemaFieldDefault::None);
1401    };
1402    if matches!(default, crate::value::Value::Null) {
1403        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1404            entity_name: entity_name.to_string(),
1405            column_name: column_name.to_string(),
1406            detail: "NULL cannot be used as an accepted database default".to_string(),
1407        });
1408    }
1409
1410    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
1411        .unwrap_or_else(|| default.clone());
1412    let contract =
1413        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
1414    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1415        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1416            entity_name: entity_name.to_string(),
1417            column_name: column_name.to_string(),
1418            detail: error.to_string(),
1419        },
1420    )?;
1421
1422    Ok(SchemaFieldDefault::SlotPayload(payload))
1423}
1424
1425fn schema_field_default_for_alter_column_default(
1426    entity_name: &str,
1427    field: &PersistedFieldSnapshot,
1428    default: &crate::value::Value,
1429) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1430    if matches!(default, crate::value::Value::Null) {
1431        return Err(SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1432            entity_name: entity_name.to_string(),
1433            column_name: field.name().to_string(),
1434            detail: "NULL cannot be used as an accepted database default".to_string(),
1435        });
1436    }
1437
1438    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(field.kind(), default)
1439        .unwrap_or_else(|| default.clone());
1440    let contract = AcceptedFieldDecodeContract::new(
1441        field.name(),
1442        field.kind(),
1443        field.nullable(),
1444        field.storage_decode(),
1445        field.leaf_codec(),
1446    );
1447    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1448        |error| SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1449            entity_name: entity_name.to_string(),
1450            column_name: field.name().to_string(),
1451            detail: error.to_string(),
1452        },
1453    )?;
1454
1455    Ok(SchemaFieldDefault::SlotPayload(payload))
1456}
1457
1458fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
1459    let next = accepted_before
1460        .persisted_snapshot()
1461        .fields()
1462        .iter()
1463        .map(|field| field.id().get())
1464        .max()
1465        .unwrap_or(0)
1466        .checked_add(1)
1467        .expect("accepted field IDs should not be exhausted");
1468
1469    FieldId::new(next)
1470}
1471
1472fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
1473    let next = accepted_before
1474        .persisted_snapshot()
1475        .row_layout()
1476        .field_to_slot()
1477        .iter()
1478        .map(|(_, slot)| slot.get())
1479        .max()
1480        .unwrap_or(0)
1481        .checked_add(1)
1482        .expect("accepted row slots should not be exhausted");
1483
1484    SchemaFieldSlot::new(next)
1485}
1486
1487fn persisted_field_contract_for_sql_column_type(
1488    column_type: &str,
1489) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
1490    let normalized = column_type.trim().to_ascii_lowercase();
1491    match normalized.as_str() {
1492        "bool" | "boolean" => Some((
1493            PersistedFieldKind::Bool,
1494            FieldStorageDecode::ByKind,
1495            LeafCodec::Scalar(ScalarCodec::Bool),
1496        )),
1497        "int" | "integer" => Some((
1498            PersistedFieldKind::Int,
1499            FieldStorageDecode::ByKind,
1500            LeafCodec::Scalar(ScalarCodec::Int64),
1501        )),
1502        "nat" | "natural" => Some((
1503            PersistedFieldKind::Nat,
1504            FieldStorageDecode::ByKind,
1505            LeafCodec::Scalar(ScalarCodec::Nat64),
1506        )),
1507        "text" | "string" => Some((
1508            PersistedFieldKind::Text { max_len: None },
1509            FieldStorageDecode::ByKind,
1510            LeafCodec::Scalar(ScalarCodec::Text),
1511        )),
1512        _ => None,
1513    }
1514}
1515
1516#[derive(Clone, Debug, Eq, PartialEq)]
1517pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
1518    FieldPath(BoundSqlDdlFieldPath),
1519    Expression(BoundSqlDdlExpressionKey),
1520}
1521
1522///
1523/// BoundSqlDdlExpressionKey
1524///
1525/// Accepted expression-index key target for SQL DDL binding.
1526///
1527#[derive(Clone, Debug, Eq, PartialEq)]
1528pub(in crate::db) struct BoundSqlDdlExpressionKey {
1529    op: PersistedIndexExpressionOp,
1530    source: BoundSqlDdlFieldPath,
1531    canonical_sql: String,
1532}
1533
1534impl BoundSqlDdlExpressionKey {
1535    /// Return the accepted expression operation.
1536    #[must_use]
1537    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1538        self.op
1539    }
1540
1541    /// Borrow the accepted source field path.
1542    #[must_use]
1543    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1544        &self.source
1545    }
1546
1547    /// Borrow the SQL-facing canonical expression text.
1548    #[must_use]
1549    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1550        self.canonical_sql.as_str()
1551    }
1552}
1553
1554fn bind_create_index_key_item(
1555    key_item: &SqlCreateIndexKeyItem,
1556    entity_name: &str,
1557    schema: &SchemaInfo,
1558) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1559    match key_item {
1560        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1561            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1562                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1563        }
1564        SqlCreateIndexKeyItem::Expression(expression) => {
1565            bind_create_index_expression_key(expression, entity_name, schema)
1566        }
1567    }
1568}
1569
1570fn bind_create_index_expression_key(
1571    expression: &SqlCreateIndexExpressionKey,
1572    entity_name: &str,
1573    schema: &SchemaInfo,
1574) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1575    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1576
1577    Ok(BoundSqlDdlCreateIndexKey::Expression(
1578        BoundSqlDdlExpressionKey {
1579            op: expression_op_from_sql_function(expression.function),
1580            source,
1581            canonical_sql: expression.canonical_sql(),
1582        },
1583    ))
1584}
1585
1586const fn expression_op_from_sql_function(
1587    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1588) -> PersistedIndexExpressionOp {
1589    match function {
1590        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1591            PersistedIndexExpressionOp::Lower
1592        }
1593        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1594            PersistedIndexExpressionOp::Upper
1595        }
1596        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1597            PersistedIndexExpressionOp::Trim
1598        }
1599    }
1600}
1601
1602fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1603    key_items
1604        .iter()
1605        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1606}
1607
1608fn create_index_field_path_report_items(
1609    key_items: &[BoundSqlDdlCreateIndexKey],
1610) -> Vec<BoundSqlDdlFieldPath> {
1611    key_items
1612        .iter()
1613        .map(|key_item| match key_item {
1614            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1615            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1616        })
1617        .collect()
1618}
1619
1620fn bind_create_index_field_path(
1621    field_path: &str,
1622    entity_name: &str,
1623    schema: &SchemaInfo,
1624) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1625    let mut path = field_path
1626        .split('.')
1627        .map(str::trim)
1628        .filter(|segment| !segment.is_empty());
1629    let Some(root) = path.next() else {
1630        return Err(SqlDdlBindError::UnknownFieldPath {
1631            entity_name: entity_name.to_string(),
1632            field_path: field_path.to_string(),
1633        });
1634    };
1635    let segments = path.map(str::to_string).collect::<Vec<_>>();
1636
1637    let capabilities = if segments.is_empty() {
1638        schema.sql_capabilities(root)
1639    } else {
1640        schema.nested_sql_capabilities(root, segments.as_slice())
1641    }
1642    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1643        entity_name: entity_name.to_string(),
1644        field_path: field_path.to_string(),
1645    })?;
1646
1647    if !capabilities.orderable() {
1648        return Err(SqlDdlBindError::FieldPathNotIndexable {
1649            field_path: field_path.to_string(),
1650        });
1651    }
1652
1653    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1654    accepted_path.push(root.to_string());
1655    accepted_path.extend(segments.iter().cloned());
1656
1657    Ok(BoundSqlDdlFieldPath {
1658        root: root.to_string(),
1659        segments,
1660        accepted_path,
1661    })
1662}
1663
1664fn find_field_path_index_by_name<'a>(
1665    schema: &'a SchemaInfo,
1666    index_name: &str,
1667) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1668    schema
1669        .field_path_indexes()
1670        .iter()
1671        .find(|index| index.name() == index_name)
1672}
1673
1674fn existing_field_path_index_matches_request(
1675    index: &crate::db::schema::SchemaIndexInfo,
1676    field_paths: &[BoundSqlDdlFieldPath],
1677    predicate_sql: Option<&str>,
1678    uniqueness: SqlCreateIndexUniqueness,
1679) -> bool {
1680    let fields = index.fields();
1681
1682    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1683        && index.predicate_sql() == predicate_sql
1684        && fields.len() == field_paths.len()
1685        && fields
1686            .iter()
1687            .zip(field_paths)
1688            .all(|(field, requested)| field.path() == requested.accepted_path())
1689}
1690
1691fn find_expression_index_by_name<'a>(
1692    schema: &'a SchemaInfo,
1693    index_name: &str,
1694) -> Option<&'a SchemaExpressionIndexInfo> {
1695    schema
1696        .expression_indexes()
1697        .iter()
1698        .find(|index| index.name() == index_name)
1699}
1700
1701fn existing_expression_index_matches_request(
1702    index: &SchemaExpressionIndexInfo,
1703    key_items: &[BoundSqlDdlCreateIndexKey],
1704    predicate_sql: Option<&str>,
1705    uniqueness: SqlCreateIndexUniqueness,
1706) -> bool {
1707    let existing_key_items = index.key_items();
1708
1709    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1710        && index.predicate_sql() == predicate_sql
1711        && existing_key_items.len() == key_items.len()
1712        && existing_key_items
1713            .iter()
1714            .zip(key_items)
1715            .all(existing_expression_key_item_matches_request)
1716}
1717
1718fn existing_expression_key_item_matches_request(
1719    existing: (
1720        &SchemaExpressionIndexKeyItemInfo,
1721        &BoundSqlDdlCreateIndexKey,
1722    ),
1723) -> bool {
1724    let (existing, requested) = existing;
1725    match (existing, requested) {
1726        (
1727            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1728            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1729        ) => existing.path() == requested.accepted_path(),
1730        (
1731            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1732            BoundSqlDdlCreateIndexKey::Expression(requested),
1733        ) => existing_expression_component_matches_request(
1734            existing.op(),
1735            existing.source().path(),
1736            existing.canonical_text(),
1737            requested,
1738        ),
1739        _ => false,
1740    }
1741}
1742
1743fn existing_expression_component_matches_request(
1744    existing_op: PersistedIndexExpressionOp,
1745    existing_path: &[String],
1746    existing_canonical_text: &str,
1747    requested: &BoundSqlDdlExpressionKey,
1748) -> bool {
1749    let requested_path = requested.source().accepted_path();
1750    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1751
1752    existing_op == requested.op()
1753        && existing_path == requested_path
1754        && existing_canonical_text == requested_canonical_text
1755}
1756
1757fn reject_duplicate_expression_index(
1758    key_items: &[BoundSqlDdlCreateIndexKey],
1759    predicate_sql: Option<&str>,
1760    schema: &SchemaInfo,
1761) -> Result<(), SqlDdlBindError> {
1762    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1763        existing_expression_index_matches_request(
1764            index,
1765            key_items,
1766            predicate_sql,
1767            if index.unique() {
1768                SqlCreateIndexUniqueness::Unique
1769            } else {
1770                SqlCreateIndexUniqueness::NonUnique
1771            },
1772        )
1773    }) else {
1774        return Ok(());
1775    };
1776
1777    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1778        field_path: ddl_key_item_report(key_items).join(","),
1779        existing_index: existing_index.name().to_string(),
1780    })
1781}
1782
1783fn reject_duplicate_field_path_index(
1784    field_paths: &[BoundSqlDdlFieldPath],
1785    predicate_sql: Option<&str>,
1786    schema: &SchemaInfo,
1787) -> Result<(), SqlDdlBindError> {
1788    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1789        let fields = index.fields();
1790        index.predicate_sql() == predicate_sql
1791            && fields.len() == field_paths.len()
1792            && fields
1793                .iter()
1794                .zip(field_paths)
1795                .all(|(field, requested)| field.path() == requested.accepted_path())
1796    }) else {
1797        return Ok(());
1798    };
1799
1800    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1801        field_path: ddl_field_path_report(field_paths).join(","),
1802        existing_index: existing_index.name().to_string(),
1803    })
1804}
1805
1806fn candidate_index_snapshot(
1807    index_name: &str,
1808    key_items: &[BoundSqlDdlCreateIndexKey],
1809    predicate_sql: Option<&str>,
1810    uniqueness: SqlCreateIndexUniqueness,
1811    schema: &SchemaInfo,
1812    index_store_path: &'static str,
1813) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1814    let key = if key_items_are_field_path_only(key_items) {
1815        PersistedIndexKeySnapshot::FieldPath(
1816            key_items
1817                .iter()
1818                .map(|key_item| {
1819                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1820                        unreachable!("field-path-only index checked before field-path lowering");
1821                    };
1822
1823                    accepted_index_field_path_snapshot(schema, field_path)
1824                })
1825                .collect::<Result<Vec<_>, _>>()?,
1826        )
1827    } else {
1828        PersistedIndexKeySnapshot::Items(
1829            key_items
1830                .iter()
1831                .map(|key_item| match key_item {
1832                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1833                        accepted_index_field_path_snapshot(schema, field_path)
1834                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1835                    }
1836                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1837                        accepted_index_expression_snapshot(schema, expression)
1838                    }
1839                })
1840                .collect::<Result<Vec<_>, _>>()?,
1841        )
1842    };
1843
1844    Ok(PersistedIndexSnapshot::new_sql_ddl(
1845        schema.next_secondary_index_ordinal(),
1846        index_name.to_string(),
1847        index_store_path.to_string(),
1848        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1849        key,
1850        predicate_sql.map(str::to_string),
1851    ))
1852}
1853
1854fn accepted_index_field_path_snapshot(
1855    schema: &SchemaInfo,
1856    field_path: &BoundSqlDdlFieldPath,
1857) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1858    schema
1859        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1860        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1861            field_path: field_path.accepted_path().join("."),
1862        })
1863}
1864
1865fn accepted_index_expression_snapshot(
1866    schema: &SchemaInfo,
1867    expression: &BoundSqlDdlExpressionKey,
1868) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1869    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1870    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1871        return Err(SqlDdlBindError::FieldPathNotIndexable {
1872            field_path: expression.source().accepted_path().join("."),
1873        });
1874    };
1875
1876    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1877        PersistedIndexExpressionSnapshot::new(
1878            expression.op(),
1879            source.clone(),
1880            source.kind().clone(),
1881            output_kind,
1882            format!("expr:v1:{}", expression.canonical_sql()),
1883        ),
1884    )))
1885}
1886
1887fn expression_output_kind(
1888    op: PersistedIndexExpressionOp,
1889    source_kind: &PersistedFieldKind,
1890) -> Option<PersistedFieldKind> {
1891    match op {
1892        PersistedIndexExpressionOp::Lower
1893        | PersistedIndexExpressionOp::Upper
1894        | PersistedIndexExpressionOp::Trim
1895        | PersistedIndexExpressionOp::LowerTrim => {
1896            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1897                Some(source_kind.clone())
1898            } else {
1899                None
1900            }
1901        }
1902        PersistedIndexExpressionOp::Date => {
1903            if matches!(
1904                source_kind,
1905                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1906            ) {
1907                Some(PersistedFieldKind::Date)
1908            } else {
1909                None
1910            }
1911        }
1912        PersistedIndexExpressionOp::Year
1913        | PersistedIndexExpressionOp::Month
1914        | PersistedIndexExpressionOp::Day => {
1915            if matches!(
1916                source_kind,
1917                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1918            ) {
1919                Some(PersistedFieldKind::Int)
1920            } else {
1921                None
1922            }
1923        }
1924    }
1925}
1926
1927fn validated_create_index_predicate_sql(
1928    predicate_sql: Option<&str>,
1929    schema: &SchemaInfo,
1930) -> Result<Option<String>, SqlDdlBindError> {
1931    let Some(predicate_sql) = predicate_sql else {
1932        return Ok(None);
1933    };
1934    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1935        SqlDdlBindError::InvalidFilteredIndexPredicate {
1936            detail: error.to_string(),
1937        }
1938    })?;
1939    validate_predicate(schema, &predicate).map_err(|error| {
1940        SqlDdlBindError::InvalidFilteredIndexPredicate {
1941            detail: error.to_string(),
1942        }
1943    })?;
1944
1945    Ok(Some(predicate_sql.to_string()))
1946}
1947
1948fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1949    match field_paths {
1950        [field_path] => field_path.accepted_path().to_vec(),
1951        _ => vec![
1952            field_paths
1953                .iter()
1954                .map(|field_path| field_path.accepted_path().join("."))
1955                .collect::<Vec<_>>()
1956                .join(","),
1957        ],
1958    }
1959}
1960
1961fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1962    match key_items {
1963        [key_item] => vec![ddl_key_item_text(key_item)],
1964        _ => vec![
1965            key_items
1966                .iter()
1967                .map(ddl_key_item_text)
1968                .collect::<Vec<_>>()
1969                .join(","),
1970        ],
1971    }
1972}
1973
1974fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1975    match key_item {
1976        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1977        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1978    }
1979}
1980
1981/// Lower one bound SQL DDL request through schema mutation admission.
1982pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1983    request: &BoundSqlDdlRequest,
1984) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1985    match request.statement() {
1986        BoundSqlDdlStatement::AddColumn(add) => {
1987            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
1988        }
1989        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1990            Ok(admit_sql_ddl_field_default_candidate(alter.field()))
1991        }
1992        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
1993            Ok(admit_sql_ddl_field_nullability_candidate(alter.field()))
1994        }
1995        BoundSqlDdlStatement::CreateIndex(create) => {
1996            if create.candidate_index().key().is_field_path_only() {
1997                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1998            } else {
1999                admit_sql_ddl_expression_index_candidate(create.candidate_index())
2000            }
2001        }
2002        BoundSqlDdlStatement::DropIndex(drop) => {
2003            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
2004        }
2005        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
2006    }
2007    .map_err(SqlDdlLoweringError::MutationAdmission)
2008}
2009
2010/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
2011pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
2012    accepted_before: &AcceptedSchemaSnapshot,
2013    request: &BoundSqlDdlRequest,
2014) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
2015    match request.statement() {
2016        BoundSqlDdlStatement::AddColumn(add) => {
2017            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
2018        }
2019        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
2020            derive_sql_ddl_field_default_accepted_after(
2021                accepted_before,
2022                alter.field_name(),
2023                alter.default().clone(),
2024            )
2025        }
2026        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
2027            derive_sql_ddl_field_nullability_accepted_after(
2028                accepted_before,
2029                alter.field_name(),
2030                alter.nullable(),
2031            )
2032        }
2033        BoundSqlDdlStatement::CreateIndex(create) => {
2034            if create.candidate_index().key().is_field_path_only() {
2035                derive_sql_ddl_field_path_index_accepted_after(
2036                    accepted_before,
2037                    create.candidate_index().clone(),
2038                )
2039            } else {
2040                derive_sql_ddl_expression_index_accepted_after(
2041                    accepted_before,
2042                    create.candidate_index().clone(),
2043                )
2044            }
2045        }
2046        BoundSqlDdlStatement::DropIndex(drop) => {
2047            derive_sql_ddl_secondary_index_drop_accepted_after(
2048                accepted_before,
2049                drop.dropped_index(),
2050            )
2051        }
2052        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
2053    }
2054    .map_err(SqlDdlLoweringError::MutationAdmission)
2055}
2056
2057fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
2058    match bound.statement() {
2059        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
2060            mutation_kind: if add.field().default().is_none() {
2061                SqlDdlMutationKind::AddNullableField
2062            } else {
2063                SqlDdlMutationKind::AddDefaultedField
2064            },
2065            target_index: add.field().name().to_string(),
2066            target_store: add.entity_name().to_string(),
2067            field_path: vec![add.field().name().to_string()],
2068            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2069            rows_scanned: 0,
2070            index_keys_written: 0,
2071        },
2072        BoundSqlDdlStatement::AlterColumnDefault(alter) => SqlDdlPreparationReport {
2073            mutation_kind: alter.mutation_kind(),
2074            target_index: alter.field_name().to_string(),
2075            target_store: alter.entity_name().to_string(),
2076            field_path: vec![alter.field_name().to_string()],
2077            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2078            rows_scanned: 0,
2079            index_keys_written: 0,
2080        },
2081        BoundSqlDdlStatement::AlterColumnNullability(alter) => SqlDdlPreparationReport {
2082            mutation_kind: alter.mutation_kind(),
2083            target_index: alter.field_name().to_string(),
2084            target_store: alter.entity_name().to_string(),
2085            field_path: vec![alter.field_name().to_string()],
2086            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2087            rows_scanned: 0,
2088            index_keys_written: 0,
2089        },
2090        BoundSqlDdlStatement::CreateIndex(create) => {
2091            let target = create.candidate_index();
2092
2093            SqlDdlPreparationReport {
2094                mutation_kind: if target.key().is_field_path_only() {
2095                    SqlDdlMutationKind::AddFieldPathIndex
2096                } else {
2097                    SqlDdlMutationKind::AddExpressionIndex
2098                },
2099                target_index: target.name().to_string(),
2100                target_store: target.store().to_string(),
2101                field_path: ddl_key_item_report(create.key_items()),
2102                execution_status: SqlDdlExecutionStatus::PreparedOnly,
2103                rows_scanned: 0,
2104                index_keys_written: 0,
2105            }
2106        }
2107        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
2108            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
2109            target_index: drop.index_name().to_string(),
2110            target_store: drop.dropped_index().store().to_string(),
2111            field_path: drop.field_path().to_vec(),
2112            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2113            rows_scanned: 0,
2114            index_keys_written: 0,
2115        },
2116        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
2117            mutation_kind: no_op.mutation_kind(),
2118            target_index: no_op.index_name().to_string(),
2119            target_store: no_op.target_store().to_string(),
2120            field_path: no_op.field_path().to_vec(),
2121            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2122            rows_scanned: 0,
2123            index_keys_written: 0,
2124        },
2125    }
2126}