Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
25        admit_sql_ddl_field_nullability_candidate, admit_sql_ddl_field_path_index_candidate,
26        admit_sql_ddl_field_rename_candidate, admit_sql_ddl_secondary_index_drop_candidate,
27        canonicalize_strict_sql_literal_for_persisted_kind,
28        derive_sql_ddl_expression_index_accepted_after,
29        derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
30        derive_sql_ddl_field_nullability_accepted_after,
31        derive_sql_ddl_field_path_index_accepted_after, derive_sql_ddl_field_rename_accepted_after,
32        derive_sql_ddl_secondary_index_drop_accepted_after,
33        resolve_sql_ddl_field_drop_dependent_index, resolve_sql_ddl_secondary_index_drop_candidate,
34    },
35    sql::{
36        identifier::identifiers_tail_match,
37        parser::{
38            SqlAlterColumnAction, SqlAlterTableAddColumnStatement,
39            SqlAlterTableAlterColumnStatement, SqlAlterTableDropColumnStatement,
40            SqlAlterTableRenameColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
41            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
42            SqlDropIndexStatement, SqlStatement,
43        },
44    },
45};
46use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
47use thiserror::Error as ThisError;
48
49///
50/// PreparedSqlDdlCommand
51///
52/// Fully prepared SQL DDL command. This is intentionally not executable yet:
53/// it packages the accepted-catalog binding, accepted-after derivation, and
54/// schema mutation admission proof for the future execution boundary.
55///
56#[derive(Clone, Debug, Eq, PartialEq)]
57pub(in crate::db) struct PreparedSqlDdlCommand {
58    bound: BoundSqlDdlRequest,
59    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
60    report: SqlDdlPreparationReport,
61}
62
63impl PreparedSqlDdlCommand {
64    /// Borrow the accepted-catalog-bound DDL request.
65    #[must_use]
66    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
67        &self.bound
68    }
69
70    /// Borrow the accepted-after derivation proof.
71    #[must_use]
72    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
73        self.derivation.as_ref()
74    }
75
76    /// Borrow the developer-facing preparation report.
77    #[must_use]
78    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
79        &self.report
80    }
81
82    /// Return whether this prepared command needs schema or storage mutation.
83    #[must_use]
84    pub(in crate::db) const fn mutates_schema(&self) -> bool {
85        self.derivation.is_some()
86    }
87}
88
89///
90/// SqlDdlPreparationReport
91///
92/// Compact report for a DDL command that has passed all pre-execution
93/// frontend and schema-mutation checks.
94///
95#[derive(Clone, Debug, Eq, PartialEq)]
96pub struct SqlDdlPreparationReport {
97    mutation_kind: SqlDdlMutationKind,
98    target_index: String,
99    target_store: String,
100    field_path: Vec<String>,
101    execution_status: SqlDdlExecutionStatus,
102    rows_scanned: usize,
103    index_keys_written: usize,
104}
105
106impl SqlDdlPreparationReport {
107    /// Return the prepared DDL mutation kind.
108    #[must_use]
109    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
110        self.mutation_kind
111    }
112
113    /// Borrow the target accepted index name.
114    #[must_use]
115    pub const fn target_index(&self) -> &str {
116        self.target_index.as_str()
117    }
118
119    /// Borrow the target accepted index store path.
120    #[must_use]
121    pub const fn target_store(&self) -> &str {
122        self.target_store.as_str()
123    }
124
125    /// Borrow the target field path.
126    #[must_use]
127    pub const fn field_path(&self) -> &[String] {
128        self.field_path.as_slice()
129    }
130
131    /// Return the execution status captured by this DDL report.
132    #[must_use]
133    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
134        self.execution_status
135    }
136
137    /// Return rows scanned by DDL execution.
138    #[must_use]
139    pub const fn rows_scanned(&self) -> usize {
140        self.rows_scanned
141    }
142
143    /// Return index keys written by DDL execution.
144    #[must_use]
145    pub const fn index_keys_written(&self) -> usize {
146        self.index_keys_written
147    }
148
149    pub(in crate::db) const fn with_execution_status(
150        mut self,
151        execution_status: SqlDdlExecutionStatus,
152    ) -> Self {
153        self.execution_status = execution_status;
154        self
155    }
156
157    pub(in crate::db) const fn with_execution_metrics(
158        mut self,
159        rows_scanned: usize,
160        index_keys_written: usize,
161    ) -> Self {
162        self.rows_scanned = rows_scanned;
163        self.index_keys_written = index_keys_written;
164        self
165    }
166}
167
168///
169/// SqlDdlMutationKind
170///
171/// Developer-facing SQL DDL mutation kind.
172///
173#[derive(Clone, Copy, Debug, Eq, PartialEq)]
174pub enum SqlDdlMutationKind {
175    AddDefaultedField,
176    AddNullableField,
177    SetFieldDefault,
178    DropFieldDefault,
179    SetFieldNotNull,
180    DropFieldNotNull,
181    DropField,
182    RenameField,
183    AddFieldPathIndex,
184    AddExpressionIndex,
185    DropSecondaryIndex,
186}
187
188impl SqlDdlMutationKind {
189    /// Return the stable diagnostic label for this DDL mutation kind.
190    #[must_use]
191    pub const fn as_str(self) -> &'static str {
192        match self {
193            Self::AddDefaultedField => "add_defaulted_field",
194            Self::AddNullableField => "add_nullable_field",
195            Self::SetFieldDefault => "set_field_default",
196            Self::DropFieldDefault => "drop_field_default",
197            Self::SetFieldNotNull => "set_field_not_null",
198            Self::DropFieldNotNull => "drop_field_not_null",
199            Self::DropField => "drop_field",
200            Self::RenameField => "rename_field",
201            Self::AddFieldPathIndex => "add_field_path_index",
202            Self::AddExpressionIndex => "add_expression_index",
203            Self::DropSecondaryIndex => "drop_secondary_index",
204        }
205    }
206}
207
208///
209/// SqlDdlExecutionStatus
210///
211/// SQL DDL execution state at the current boundary.
212///
213#[derive(Clone, Copy, Debug, Eq, PartialEq)]
214pub enum SqlDdlExecutionStatus {
215    PreparedOnly,
216    Published,
217    NoOp,
218}
219
220impl SqlDdlExecutionStatus {
221    /// Return the stable diagnostic label for this execution status.
222    #[must_use]
223    pub const fn as_str(self) -> &'static str {
224        match self {
225            Self::PreparedOnly => "prepared_only",
226            Self::Published => "published",
227            Self::NoOp => "no_op",
228        }
229    }
230}
231
232///
233/// BoundSqlDdlRequest
234///
235/// Accepted-catalog SQL DDL request after parser syntax has been resolved
236/// against one runtime schema snapshot.
237///
238#[derive(Clone, Debug, Eq, PartialEq)]
239pub(in crate::db) struct BoundSqlDdlRequest {
240    statement: BoundSqlDdlStatement,
241}
242
243impl BoundSqlDdlRequest {
244    /// Borrow the bound statement payload.
245    #[must_use]
246    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
247        &self.statement
248    }
249}
250
251///
252/// BoundSqlDdlStatement
253///
254/// Catalog-resolved DDL statement vocabulary.
255///
256#[derive(Clone, Debug, Eq, PartialEq)]
257pub(in crate::db) enum BoundSqlDdlStatement {
258    AddColumn(BoundSqlAddColumnRequest),
259    AlterColumnDefault(BoundSqlAlterColumnDefaultRequest),
260    AlterColumnNullability(BoundSqlAlterColumnNullabilityRequest),
261    RenameColumn(BoundSqlRenameColumnRequest),
262    CreateIndex(BoundSqlCreateIndexRequest),
263    DropIndex(BoundSqlDropIndexRequest),
264    NoOp(BoundSqlDdlNoOpRequest),
265}
266
267///
268/// BoundSqlAddColumnRequest
269///
270/// Catalog-resolved additive field DDL request.
271///
272#[derive(Clone, Debug, Eq, PartialEq)]
273pub(in crate::db) struct BoundSqlAddColumnRequest {
274    entity_name: String,
275    field: PersistedFieldSnapshot,
276}
277
278impl BoundSqlAddColumnRequest {
279    /// Borrow the accepted entity name.
280    #[must_use]
281    pub(in crate::db) const fn entity_name(&self) -> &str {
282        self.entity_name.as_str()
283    }
284
285    /// Borrow the accepted DDL-owned field snapshot to publish.
286    #[must_use]
287    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
288        &self.field
289    }
290}
291
292///
293/// BoundSqlAlterColumnDefaultRequest
294///
295/// Catalog-resolved field-default metadata DDL request.
296///
297#[derive(Clone, Debug, Eq, PartialEq)]
298pub(in crate::db) struct BoundSqlAlterColumnDefaultRequest {
299    entity_name: String,
300    field: PersistedFieldSnapshot,
301    default: SchemaFieldDefault,
302    mutation_kind: SqlDdlMutationKind,
303}
304
305impl BoundSqlAlterColumnDefaultRequest {
306    /// Borrow the accepted entity name.
307    #[must_use]
308    pub(in crate::db) const fn entity_name(&self) -> &str {
309        self.entity_name.as_str()
310    }
311
312    /// Borrow the accepted field name.
313    #[must_use]
314    pub(in crate::db) const fn field_name(&self) -> &str {
315        self.field.name()
316    }
317
318    /// Borrow the accepted field whose default will change.
319    #[must_use]
320    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
321        &self.field
322    }
323
324    /// Borrow the default contract to publish.
325    #[must_use]
326    pub(in crate::db) const fn default(&self) -> &SchemaFieldDefault {
327        &self.default
328    }
329
330    /// Return the field-default mutation kind.
331    #[must_use]
332    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
333        self.mutation_kind
334    }
335}
336
337///
338/// BoundSqlAlterColumnNullabilityRequest
339///
340/// Catalog-resolved field-nullability metadata DDL request.
341///
342#[derive(Clone, Debug, Eq, PartialEq)]
343pub(in crate::db) struct BoundSqlAlterColumnNullabilityRequest {
344    entity_name: String,
345    field: PersistedFieldSnapshot,
346    nullable: bool,
347    mutation_kind: SqlDdlMutationKind,
348}
349
350impl BoundSqlAlterColumnNullabilityRequest {
351    /// Borrow the accepted entity name.
352    #[must_use]
353    pub(in crate::db) const fn entity_name(&self) -> &str {
354        self.entity_name.as_str()
355    }
356
357    /// Borrow the accepted field name.
358    #[must_use]
359    pub(in crate::db) const fn field_name(&self) -> &str {
360        self.field.name()
361    }
362
363    /// Borrow the accepted field whose nullability will change.
364    #[must_use]
365    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
366        &self.field
367    }
368
369    /// Return the nullable contract to publish.
370    #[must_use]
371    pub(in crate::db) const fn nullable(&self) -> bool {
372        self.nullable
373    }
374
375    /// Return the field-nullability mutation kind.
376    #[must_use]
377    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
378        self.mutation_kind
379    }
380}
381
382///
383/// BoundSqlRenameColumnRequest
384///
385/// Catalog-resolved field-rename metadata DDL request.
386///
387#[derive(Clone, Debug, Eq, PartialEq)]
388pub(in crate::db) struct BoundSqlRenameColumnRequest {
389    entity_name: String,
390    field: PersistedFieldSnapshot,
391    new_name: String,
392}
393
394impl BoundSqlRenameColumnRequest {
395    /// Borrow the accepted entity name.
396    #[must_use]
397    pub(in crate::db) const fn entity_name(&self) -> &str {
398        self.entity_name.as_str()
399    }
400
401    /// Borrow the accepted source field name.
402    #[must_use]
403    pub(in crate::db) const fn old_name(&self) -> &str {
404        self.field.name()
405    }
406
407    /// Borrow the accepted target field name.
408    #[must_use]
409    pub(in crate::db) const fn new_name(&self) -> &str {
410        self.new_name.as_str()
411    }
412
413    /// Borrow the accepted source field.
414    #[must_use]
415    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
416        &self.field
417    }
418}
419
420///
421/// BoundSqlDdlNoOpRequest
422///
423/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
424///
425#[derive(Clone, Debug, Eq, PartialEq)]
426pub(in crate::db) struct BoundSqlDdlNoOpRequest {
427    mutation_kind: SqlDdlMutationKind,
428    index_name: String,
429    entity_name: String,
430    target_store: String,
431    field_path: Vec<String>,
432}
433
434impl BoundSqlDdlNoOpRequest {
435    /// Return the user-facing mutation family this no-op belongs to.
436    #[must_use]
437    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
438        self.mutation_kind
439    }
440
441    /// Borrow the requested index name.
442    #[must_use]
443    pub(in crate::db) const fn index_name(&self) -> &str {
444        self.index_name.as_str()
445    }
446
447    /// Borrow the accepted entity name that owns this request.
448    #[must_use]
449    pub(in crate::db) const fn entity_name(&self) -> &str {
450        self.entity_name.as_str()
451    }
452
453    /// Borrow the accepted index store path, or `-` when no target exists.
454    #[must_use]
455    pub(in crate::db) const fn target_store(&self) -> &str {
456        self.target_store.as_str()
457    }
458
459    /// Borrow the target field path, empty when no target exists.
460    #[must_use]
461    pub(in crate::db) const fn field_path(&self) -> &[String] {
462        self.field_path.as_slice()
463    }
464}
465
466///
467/// BoundSqlCreateIndexRequest
468///
469/// Catalog-resolved request for adding one secondary index.
470///
471#[derive(Clone, Debug, Eq, PartialEq)]
472pub(in crate::db) struct BoundSqlCreateIndexRequest {
473    index_name: String,
474    entity_name: String,
475    key_items: Vec<BoundSqlDdlCreateIndexKey>,
476    field_paths: Vec<BoundSqlDdlFieldPath>,
477    candidate_index: PersistedIndexSnapshot,
478}
479
480impl BoundSqlCreateIndexRequest {
481    /// Borrow the requested index name.
482    #[must_use]
483    pub(in crate::db) const fn index_name(&self) -> &str {
484        self.index_name.as_str()
485    }
486
487    /// Borrow the accepted entity name that owns this request.
488    #[must_use]
489    pub(in crate::db) const fn entity_name(&self) -> &str {
490        self.entity_name.as_str()
491    }
492
493    /// Borrow the accepted field-path targets.
494    #[must_use]
495    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
496        self.field_paths.as_slice()
497    }
498
499    /// Borrow the accepted key targets in DDL key order.
500    #[must_use]
501    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
502        self.key_items.as_slice()
503    }
504
505    /// Borrow the candidate accepted index snapshot for mutation admission.
506    #[must_use]
507    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
508        &self.candidate_index
509    }
510}
511
512///
513/// BoundSqlDropIndexRequest
514///
515/// Catalog-resolved request for dropping one DDL-published secondary index.
516///
517#[derive(Clone, Debug, Eq, PartialEq)]
518pub(in crate::db) struct BoundSqlDropIndexRequest {
519    index_name: String,
520    entity_name: String,
521    dropped_index: PersistedIndexSnapshot,
522    field_path: Vec<String>,
523}
524
525impl BoundSqlDropIndexRequest {
526    /// Borrow the requested index name.
527    #[must_use]
528    pub(in crate::db) const fn index_name(&self) -> &str {
529        self.index_name.as_str()
530    }
531
532    /// Borrow the accepted entity name that owns this request.
533    #[must_use]
534    pub(in crate::db) const fn entity_name(&self) -> &str {
535        self.entity_name.as_str()
536    }
537
538    /// Borrow the accepted index snapshot that will be removed.
539    #[must_use]
540    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
541        &self.dropped_index
542    }
543
544    /// Borrow the dropped field-path target.
545    #[must_use]
546    pub(in crate::db) const fn field_path(&self) -> &[String] {
547        self.field_path.as_slice()
548    }
549}
550
551///
552/// BoundSqlDdlFieldPath
553///
554/// Accepted field-path target for SQL DDL binding.
555///
556#[derive(Clone, Debug, Eq, PartialEq)]
557pub(in crate::db) struct BoundSqlDdlFieldPath {
558    root: String,
559    segments: Vec<String>,
560    accepted_path: Vec<String>,
561}
562
563impl BoundSqlDdlFieldPath {
564    /// Borrow the top-level field name.
565    #[must_use]
566    pub(in crate::db) const fn root(&self) -> &str {
567        self.root.as_str()
568    }
569
570    /// Borrow nested path segments below the top-level field.
571    #[must_use]
572    pub(in crate::db) const fn segments(&self) -> &[String] {
573        self.segments.as_slice()
574    }
575
576    /// Borrow the full accepted field path used by index metadata.
577    #[must_use]
578    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
579        self.accepted_path.as_slice()
580    }
581}
582
583///
584/// SqlDdlBindError
585///
586/// Typed fail-closed reasons for SQL DDL catalog binding.
587///
588#[derive(Debug, Eq, PartialEq, ThisError)]
589pub(in crate::db) enum SqlDdlBindError {
590    #[error("SQL DDL binder requires a DDL statement")]
591    NotDdl,
592
593    #[error("accepted schema does not expose an entity name")]
594    MissingEntityName,
595
596    #[error("accepted schema does not expose an entity path")]
597    MissingEntityPath,
598
599    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
600    EntityMismatch {
601        sql_entity: String,
602        expected_entity: String,
603    },
604
605    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
606    UnknownFieldPath {
607        entity_name: String,
608        field_path: String,
609    },
610
611    #[error("field path '{field_path}' is not indexable")]
612    FieldPathNotIndexable { field_path: String },
613
614    #[error("field path '{field_path}' depends on generated-only metadata")]
615    FieldPathNotAcceptedCatalogBacked { field_path: String },
616
617    #[error("invalid filtered index predicate: {detail}")]
618    InvalidFilteredIndexPredicate { detail: String },
619
620    #[error("index name '{index_name}' already exists in the accepted schema")]
621    DuplicateIndexName { index_name: String },
622
623    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
624    DuplicateFieldPathIndex {
625        field_path: String,
626        existing_index: String,
627    },
628
629    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
630    UnknownIndex {
631        entity_name: String,
632        index_name: String,
633    },
634
635    #[error(
636        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
637    )]
638    GeneratedIndexDropRejected { index_name: String },
639
640    #[error(
641        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
642    )]
643    UnsupportedDropIndex { index_name: String },
644
645    #[error(
646        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
647    )]
648    UnsupportedAlterTableAddColumn {
649        entity_name: String,
650        column_name: String,
651    },
652
653    #[error(
654        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
655    )]
656    InvalidAlterTableAddColumnDefault {
657        entity_name: String,
658        column_name: String,
659        detail: String,
660    },
661
662    #[error(
663        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
664    )]
665    UnsupportedAlterTableAddColumnNotNull {
666        entity_name: String,
667        column_name: String,
668    },
669
670    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
671    DuplicateColumn {
672        entity_name: String,
673        column_name: String,
674    },
675
676    #[error(
677        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
678    )]
679    UnsupportedAlterTableAddColumnType {
680        entity_name: String,
681        column_name: String,
682        column_type: String,
683    },
684
685    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
686    UnknownColumn {
687        entity_name: String,
688        column_name: String,
689    },
690
691    #[error(
692        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
693    )]
694    UnsupportedAlterTableAlterColumn {
695        entity_name: String,
696        column_name: String,
697        action: String,
698    },
699
700    #[error(
701        "SQL DDL ALTER TABLE ALTER COLUMN SET DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
702    )]
703    InvalidAlterTableAlterColumnDefault {
704        entity_name: String,
705        column_name: String,
706        detail: String,
707    },
708
709    #[error(
710        "SQL DDL ALTER TABLE ALTER COLUMN DROP DEFAULT is not executable yet for required accepted entity '{entity_name}' column '{column_name}'"
711    )]
712    UnsupportedAlterTableDropDefaultRequired {
713        entity_name: String,
714        column_name: String,
715    },
716
717    #[error(
718        "SQL DDL ALTER TABLE ALTER COLUMN DEFAULT cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema default instead"
719    )]
720    GeneratedFieldDefaultChangeRejected {
721        entity_name: String,
722        column_name: String,
723    },
724
725    #[error(
726        "SQL DDL ALTER TABLE ALTER COLUMN NULLABILITY cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema nullability instead"
727    )]
728    GeneratedFieldNullabilityChangeRejected {
729        entity_name: String,
730        column_name: String,
731    },
732
733    #[error(
734        "SQL DDL ALTER TABLE DROP COLUMN cannot drop primary-key field '{column_name}' on entity '{entity_name}'"
735    )]
736    PrimaryKeyFieldDropRejected {
737        entity_name: String,
738        column_name: String,
739    },
740
741    #[error(
742        "SQL DDL ALTER TABLE DROP COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; remove the field from the Rust schema instead"
743    )]
744    GeneratedFieldDropRejected {
745        entity_name: String,
746        column_name: String,
747    },
748
749    #[error(
750        "SQL DDL ALTER TABLE DROP COLUMN cannot drop accepted field '{column_name}' on entity '{entity_name}' while index '{index_name}' depends on it; drop dependent DDL-owned indexes first"
751    )]
752    IndexedFieldDropRejected {
753        entity_name: String,
754        column_name: String,
755        index_name: String,
756    },
757
758    #[error(
759        "SQL DDL ALTER TABLE DROP COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'; retained-slot field removal policy is not enabled yet"
760    )]
761    UnsupportedAlterTableDropColumn {
762        entity_name: String,
763        column_name: String,
764    },
765
766    #[error(
767        "SQL DDL ALTER TABLE RENAME COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; rename the field in the Rust schema instead"
768    )]
769    GeneratedFieldRenameRejected {
770        entity_name: String,
771        column_name: String,
772    },
773
774    #[error(
775        "SQL DDL ALTER TABLE RENAME COLUMN is not executable for accepted entity '{entity_name}' column '{old_column_name}' to '{new_column_name}'"
776    )]
777    UnsupportedAlterTableRenameColumn {
778        entity_name: String,
779        old_column_name: String,
780        new_column_name: String,
781    },
782}
783
784///
785/// SqlDdlLoweringError
786///
787/// Typed fail-closed reasons while lowering bound DDL into schema mutation
788/// admission.
789///
790#[derive(Debug, Eq, PartialEq, ThisError)]
791pub(in crate::db) enum SqlDdlLoweringError {
792    #[error("SQL DDL lowering requires a supported DDL statement")]
793    UnsupportedStatement,
794
795    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
796    MutationAdmission(SchemaDdlMutationAdmissionError),
797}
798
799///
800/// SqlDdlPrepareError
801///
802/// Typed fail-closed preparation errors for SQL DDL.
803///
804#[derive(Debug, Eq, PartialEq, ThisError)]
805pub(in crate::db) enum SqlDdlPrepareError {
806    #[error("{0}")]
807    Bind(#[from] SqlDdlBindError),
808
809    #[error("{0}")]
810    Lowering(#[from] SqlDdlLoweringError),
811}
812
813/// Prepare one parsed SQL DDL statement through every pre-execution proof.
814pub(in crate::db) fn prepare_sql_ddl_statement(
815    statement: &SqlStatement,
816    accepted_before: &AcceptedSchemaSnapshot,
817    schema: &SchemaInfo,
818    index_store_path: &'static str,
819) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
820    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
821    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
822        None
823    } else {
824        Some(derive_bound_sql_ddl_accepted_after(
825            accepted_before,
826            &bound,
827        )?)
828    };
829    let report = ddl_preparation_report(&bound);
830
831    Ok(PreparedSqlDdlCommand {
832        bound,
833        derivation,
834        report,
835    })
836}
837
838/// Bind one parsed SQL DDL statement against accepted catalog metadata.
839pub(in crate::db) fn bind_sql_ddl_statement(
840    statement: &SqlStatement,
841    accepted_before: &AcceptedSchemaSnapshot,
842    schema: &SchemaInfo,
843    index_store_path: &'static str,
844) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
845    let SqlStatement::Ddl(ddl) = statement else {
846        return Err(SqlDdlBindError::NotDdl);
847    };
848
849    match ddl {
850        SqlDdlStatement::CreateIndex(statement) => {
851            bind_create_index_statement(statement, schema, index_store_path)
852        }
853        SqlDdlStatement::DropIndex(statement) => {
854            bind_drop_index_statement(statement, accepted_before, schema)
855        }
856        SqlDdlStatement::AlterTableAddColumn(statement) => {
857            bind_alter_table_add_column_statement(statement, accepted_before, schema)
858        }
859        SqlDdlStatement::AlterTableAlterColumn(statement) => {
860            bind_alter_table_alter_column_statement(statement, accepted_before, schema)
861        }
862        SqlDdlStatement::AlterTableDropColumn(statement) => {
863            bind_alter_table_drop_column_statement(statement, accepted_before, schema)
864        }
865        SqlDdlStatement::AlterTableRenameColumn(statement) => {
866            bind_alter_table_rename_column_statement(statement, accepted_before, schema)
867        }
868    }
869}
870
871fn bind_create_index_statement(
872    statement: &SqlCreateIndexStatement,
873    schema: &SchemaInfo,
874    index_store_path: &'static str,
875) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
876    let entity_name = schema
877        .entity_name()
878        .ok_or(SqlDdlBindError::MissingEntityName)?;
879
880    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
881        return Err(SqlDdlBindError::EntityMismatch {
882            sql_entity: statement.entity.clone(),
883            expected_entity: entity_name.to_string(),
884        });
885    }
886
887    let key_items = statement
888        .key_items
889        .iter()
890        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
891        .collect::<Result<Vec<_>, _>>()?;
892    let field_paths = create_index_field_path_report_items(key_items.as_slice());
893    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
894        if key_items_are_field_path_only(key_items.as_slice())
895            && statement.if_not_exists
896            && existing_field_path_index_matches_request(
897                existing_index,
898                field_paths.as_slice(),
899                statement.predicate_sql.as_deref(),
900                statement.uniqueness,
901            )
902        {
903            return Ok(BoundSqlDdlRequest {
904                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
905                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
906                    index_name: statement.name.clone(),
907                    entity_name: entity_name.to_string(),
908                    target_store: existing_index.store().to_string(),
909                    field_path: ddl_field_path_report(field_paths.as_slice()),
910                }),
911            });
912        }
913
914        return Err(SqlDdlBindError::DuplicateIndexName {
915            index_name: statement.name.clone(),
916        });
917    }
918    let predicate_sql =
919        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
920    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
921        if statement.if_not_exists
922            && existing_expression_index_matches_request(
923                existing_index,
924                key_items.as_slice(),
925                predicate_sql.as_deref(),
926                statement.uniqueness,
927            )
928        {
929            return Ok(BoundSqlDdlRequest {
930                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
931                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
932                    index_name: statement.name.clone(),
933                    entity_name: entity_name.to_string(),
934                    target_store: existing_index.store().to_string(),
935                    field_path: ddl_key_item_report(key_items.as_slice()),
936                }),
937            });
938        }
939
940        return Err(SqlDdlBindError::DuplicateIndexName {
941            index_name: statement.name.clone(),
942        });
943    }
944    if key_items_are_field_path_only(key_items.as_slice()) {
945        reject_duplicate_field_path_index(
946            field_paths.as_slice(),
947            predicate_sql.as_deref(),
948            schema,
949        )?;
950    } else {
951        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
952    }
953    let candidate_index = candidate_index_snapshot(
954        statement.name.as_str(),
955        key_items.as_slice(),
956        predicate_sql.as_deref(),
957        statement.uniqueness,
958        schema,
959        index_store_path,
960    )?;
961
962    Ok(BoundSqlDdlRequest {
963        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
964            index_name: statement.name.clone(),
965            entity_name: entity_name.to_string(),
966            key_items,
967            field_paths,
968            candidate_index,
969        }),
970    })
971}
972
973fn bind_drop_index_statement(
974    statement: &SqlDropIndexStatement,
975    accepted_before: &AcceptedSchemaSnapshot,
976    schema: &SchemaInfo,
977) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
978    let entity_name = schema
979        .entity_name()
980        .ok_or(SqlDdlBindError::MissingEntityName)?;
981
982    if let Some(sql_entity) = statement.entity.as_deref()
983        && !identifiers_tail_match(sql_entity, entity_name)
984    {
985        return Err(SqlDdlBindError::EntityMismatch {
986            sql_entity: sql_entity.to_string(),
987            expected_entity: entity_name.to_string(),
988        });
989    }
990    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
991        accepted_before,
992        &statement.name,
993    )
994    .map_err(|error| match error {
995        SchemaDdlIndexDropCandidateError::Generated => {
996            SqlDdlBindError::GeneratedIndexDropRejected {
997                index_name: statement.name.clone(),
998            }
999        }
1000        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
1001            entity_name: entity_name.to_string(),
1002            index_name: statement.name.clone(),
1003        },
1004        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
1005            index_name: statement.name.clone(),
1006        },
1007    });
1008    let (dropped_index, field_path) = match drop_candidate {
1009        Ok((dropped_index, field_path)) => (dropped_index, field_path),
1010        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
1011            return Ok(BoundSqlDdlRequest {
1012                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1013                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1014                    index_name: statement.name.clone(),
1015                    entity_name: entity_name.to_string(),
1016                    target_store: "-".to_string(),
1017                    field_path: Vec::new(),
1018                }),
1019            });
1020        }
1021        Err(error) => return Err(error),
1022    };
1023    Ok(BoundSqlDdlRequest {
1024        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
1025            index_name: statement.name.clone(),
1026            entity_name: entity_name.to_string(),
1027            dropped_index,
1028            field_path,
1029        }),
1030    })
1031}
1032
1033fn bind_alter_table_add_column_statement(
1034    statement: &SqlAlterTableAddColumnStatement,
1035    accepted_before: &AcceptedSchemaSnapshot,
1036    schema: &SchemaInfo,
1037) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1038    let entity_name = schema
1039        .entity_name()
1040        .ok_or(SqlDdlBindError::MissingEntityName)?;
1041
1042    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1043        return Err(SqlDdlBindError::EntityMismatch {
1044            sql_entity: statement.entity.clone(),
1045            expected_entity: entity_name.to_string(),
1046        });
1047    }
1048
1049    if schema
1050        .field_nullable(statement.column_name.as_str())
1051        .is_some()
1052    {
1053        return Err(SqlDdlBindError::DuplicateColumn {
1054            entity_name: entity_name.to_string(),
1055            column_name: statement.column_name.clone(),
1056        });
1057    }
1058
1059    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
1060        statement.column_type.as_str(),
1061    )
1062    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
1063        entity_name: entity_name.to_string(),
1064        column_name: statement.column_name.clone(),
1065        column_type: statement.column_type.clone(),
1066    })?;
1067    let default = schema_field_default_for_sql_default(
1068        entity_name,
1069        statement.column_name.as_str(),
1070        statement.default.as_ref(),
1071        &kind,
1072        statement.nullable,
1073        storage_decode,
1074        leaf_codec,
1075    )?;
1076    if !statement.nullable && default.is_none() {
1077        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
1078            entity_name: entity_name.to_string(),
1079            column_name: statement.column_name.clone(),
1080        });
1081    }
1082    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
1083        next_sql_ddl_field_id(accepted_before),
1084        statement.column_name.clone(),
1085        next_sql_ddl_field_slot(accepted_before),
1086        kind,
1087        Vec::new(),
1088        statement.nullable,
1089        default,
1090        SchemaFieldWritePolicy::from_model_policies(None, None),
1091        PersistedFieldOrigin::SqlDdl,
1092        storage_decode,
1093        leaf_codec,
1094    );
1095
1096    Ok(BoundSqlDdlRequest {
1097        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
1098            entity_name: entity_name.to_string(),
1099            field,
1100        }),
1101    })
1102}
1103
1104fn alter_table_alter_column_bind_error(
1105    statement: &SqlAlterTableAlterColumnStatement,
1106    schema: &SchemaInfo,
1107) -> SqlDdlBindError {
1108    let Some(entity_name) = schema.entity_name() else {
1109        return SqlDdlBindError::MissingEntityName;
1110    };
1111
1112    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1113        return SqlDdlBindError::EntityMismatch {
1114            sql_entity: statement.entity.clone(),
1115            expected_entity: entity_name.to_string(),
1116        };
1117    }
1118
1119    if schema
1120        .field_nullable(statement.column_name.as_str())
1121        .is_none()
1122    {
1123        return SqlDdlBindError::UnknownColumn {
1124            entity_name: entity_name.to_string(),
1125            column_name: statement.column_name.clone(),
1126        };
1127    }
1128
1129    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
1130        entity_name: entity_name.to_string(),
1131        column_name: statement.column_name.clone(),
1132        action: statement.action.label().to_string(),
1133    }
1134}
1135
1136fn bind_alter_table_alter_column_statement(
1137    statement: &SqlAlterTableAlterColumnStatement,
1138    accepted_before: &AcceptedSchemaSnapshot,
1139    schema: &SchemaInfo,
1140) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1141    let Some(entity_name) = schema.entity_name() else {
1142        return Err(SqlDdlBindError::MissingEntityName);
1143    };
1144
1145    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1146        return Err(SqlDdlBindError::EntityMismatch {
1147            sql_entity: statement.entity.clone(),
1148            expected_entity: entity_name.to_string(),
1149        });
1150    }
1151
1152    let field = accepted_before
1153        .persisted_snapshot()
1154        .fields()
1155        .iter()
1156        .find(|field| field.name() == statement.column_name)
1157        .ok_or_else(|| SqlDdlBindError::UnknownColumn {
1158            entity_name: entity_name.to_string(),
1159            column_name: statement.column_name.clone(),
1160        })?;
1161
1162    match &statement.action {
1163        SqlAlterColumnAction::SetDefault(default) => {
1164            reject_generated_field_default_change(entity_name, field)?;
1165            let default =
1166                schema_field_default_for_alter_column_default(entity_name, field, default)?;
1167            Ok(bind_alter_table_alter_column_default(
1168                entity_name,
1169                field,
1170                default,
1171                SqlDdlMutationKind::SetFieldDefault,
1172            ))
1173        }
1174        SqlAlterColumnAction::DropDefault => {
1175            if !field.default().is_none() {
1176                reject_generated_field_default_change(entity_name, field)?;
1177            }
1178            if !field.nullable() && !field.default().is_none() {
1179                return Err(SqlDdlBindError::UnsupportedAlterTableDropDefaultRequired {
1180                    entity_name: entity_name.to_string(),
1181                    column_name: statement.column_name.clone(),
1182                });
1183            }
1184            Ok(bind_alter_table_alter_column_default(
1185                entity_name,
1186                field,
1187                SchemaFieldDefault::None,
1188                SqlDdlMutationKind::DropFieldDefault,
1189            ))
1190        }
1191        SqlAlterColumnAction::SetNotNull => Ok(bind_alter_table_alter_column_nullability(
1192            entity_name,
1193            field,
1194            false,
1195            SqlDdlMutationKind::SetFieldNotNull,
1196        )?),
1197        SqlAlterColumnAction::DropNotNull => Ok(bind_alter_table_alter_column_nullability(
1198            entity_name,
1199            field,
1200            true,
1201            SqlDdlMutationKind::DropFieldNotNull,
1202        )?),
1203    }
1204}
1205
1206fn bind_alter_table_drop_column_statement(
1207    statement: &SqlAlterTableDropColumnStatement,
1208    accepted_before: &AcceptedSchemaSnapshot,
1209    schema: &SchemaInfo,
1210) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1211    let entity_name = schema
1212        .entity_name()
1213        .ok_or(SqlDdlBindError::MissingEntityName)?;
1214
1215    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1216        return Err(SqlDdlBindError::EntityMismatch {
1217            sql_entity: statement.entity.clone(),
1218            expected_entity: entity_name.to_string(),
1219        });
1220    }
1221
1222    let accepted = accepted_before.persisted_snapshot();
1223    let Some(field) = accepted
1224        .fields()
1225        .iter()
1226        .find(|field| field.name() == statement.column_name)
1227    else {
1228        if statement.if_exists {
1229            return Ok(BoundSqlDdlRequest {
1230                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1231                    mutation_kind: SqlDdlMutationKind::DropField,
1232                    index_name: statement.column_name.clone(),
1233                    entity_name: entity_name.to_string(),
1234                    target_store: "-".to_string(),
1235                    field_path: vec![statement.column_name.clone()],
1236                }),
1237            });
1238        }
1239
1240        return Err(SqlDdlBindError::UnknownColumn {
1241            entity_name: entity_name.to_string(),
1242            column_name: statement.column_name.clone(),
1243        });
1244    };
1245
1246    if accepted.primary_key_field_id() == field.id() {
1247        return Err(SqlDdlBindError::PrimaryKeyFieldDropRejected {
1248            entity_name: entity_name.to_string(),
1249            column_name: statement.column_name.clone(),
1250        });
1251    }
1252
1253    if field.generated() {
1254        return Err(SqlDdlBindError::GeneratedFieldDropRejected {
1255            entity_name: entity_name.to_string(),
1256            column_name: statement.column_name.clone(),
1257        });
1258    }
1259
1260    if let Some(index_name) =
1261        resolve_sql_ddl_field_drop_dependent_index(accepted_before, field.id())
1262    {
1263        return Err(SqlDdlBindError::IndexedFieldDropRejected {
1264            entity_name: entity_name.to_string(),
1265            column_name: statement.column_name.clone(),
1266            index_name,
1267        });
1268    }
1269
1270    Err(SqlDdlBindError::UnsupportedAlterTableDropColumn {
1271        entity_name: entity_name.to_string(),
1272        column_name: statement.column_name.clone(),
1273    })
1274}
1275
1276fn bind_alter_table_rename_column_statement(
1277    statement: &SqlAlterTableRenameColumnStatement,
1278    accepted_before: &AcceptedSchemaSnapshot,
1279    schema: &SchemaInfo,
1280) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1281    let entity_name = schema
1282        .entity_name()
1283        .ok_or(SqlDdlBindError::MissingEntityName)?;
1284
1285    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1286        return Err(SqlDdlBindError::EntityMismatch {
1287            sql_entity: statement.entity.clone(),
1288            expected_entity: entity_name.to_string(),
1289        });
1290    }
1291
1292    let accepted = accepted_before.persisted_snapshot();
1293    let Some(field) = accepted
1294        .fields()
1295        .iter()
1296        .find(|field| field.name() == statement.old_column_name)
1297    else {
1298        return Err(SqlDdlBindError::UnknownColumn {
1299            entity_name: entity_name.to_string(),
1300            column_name: statement.old_column_name.clone(),
1301        });
1302    };
1303
1304    if statement.old_column_name == statement.new_column_name {
1305        return Ok(BoundSqlDdlRequest {
1306            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1307                mutation_kind: SqlDdlMutationKind::RenameField,
1308                index_name: statement.old_column_name.clone(),
1309                entity_name: entity_name.to_string(),
1310                target_store: "-".to_string(),
1311                field_path: vec![statement.old_column_name.clone()],
1312            }),
1313        });
1314    }
1315
1316    if accepted
1317        .fields()
1318        .iter()
1319        .any(|field| field.name() == statement.new_column_name)
1320    {
1321        return Err(SqlDdlBindError::DuplicateColumn {
1322            entity_name: entity_name.to_string(),
1323            column_name: statement.new_column_name.clone(),
1324        });
1325    }
1326
1327    if field.generated() {
1328        return Err(SqlDdlBindError::GeneratedFieldRenameRejected {
1329            entity_name: entity_name.to_string(),
1330            column_name: statement.old_column_name.clone(),
1331        });
1332    }
1333
1334    Ok(BoundSqlDdlRequest {
1335        statement: BoundSqlDdlStatement::RenameColumn(BoundSqlRenameColumnRequest {
1336            entity_name: entity_name.to_string(),
1337            field: field.clone(),
1338            new_name: statement.new_column_name.clone(),
1339        }),
1340    })
1341}
1342
1343fn bind_alter_table_alter_column_default(
1344    entity_name: &str,
1345    field: &PersistedFieldSnapshot,
1346    default: SchemaFieldDefault,
1347    mutation_kind: SqlDdlMutationKind,
1348) -> BoundSqlDdlRequest {
1349    if field.default() == &default {
1350        return BoundSqlDdlRequest {
1351            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1352                mutation_kind,
1353                index_name: field.name().to_string(),
1354                entity_name: entity_name.to_string(),
1355                target_store: entity_name.to_string(),
1356                field_path: vec![field.name().to_string()],
1357            }),
1358        };
1359    }
1360
1361    BoundSqlDdlRequest {
1362        statement: BoundSqlDdlStatement::AlterColumnDefault(BoundSqlAlterColumnDefaultRequest {
1363            entity_name: entity_name.to_string(),
1364            field: field.clone(),
1365            default,
1366            mutation_kind,
1367        }),
1368    }
1369}
1370
1371fn reject_generated_field_default_change(
1372    entity_name: &str,
1373    field: &PersistedFieldSnapshot,
1374) -> Result<(), SqlDdlBindError> {
1375    if field.generated() {
1376        return Err(SqlDdlBindError::GeneratedFieldDefaultChangeRejected {
1377            entity_name: entity_name.to_string(),
1378            column_name: field.name().to_string(),
1379        });
1380    }
1381
1382    Ok(())
1383}
1384
1385fn bind_alter_table_alter_column_nullability(
1386    entity_name: &str,
1387    field: &PersistedFieldSnapshot,
1388    nullable: bool,
1389    mutation_kind: SqlDdlMutationKind,
1390) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1391    if field.nullable() == nullable {
1392        return Ok(BoundSqlDdlRequest {
1393            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1394                mutation_kind,
1395                index_name: field.name().to_string(),
1396                entity_name: entity_name.to_string(),
1397                target_store: entity_name.to_string(),
1398                field_path: vec![field.name().to_string()],
1399            }),
1400        });
1401    }
1402
1403    reject_generated_field_nullability_change(entity_name, field)?;
1404
1405    Ok(BoundSqlDdlRequest {
1406        statement: BoundSqlDdlStatement::AlterColumnNullability(
1407            BoundSqlAlterColumnNullabilityRequest {
1408                entity_name: entity_name.to_string(),
1409                field: field.clone(),
1410                nullable,
1411                mutation_kind,
1412            },
1413        ),
1414    })
1415}
1416
1417fn reject_generated_field_nullability_change(
1418    entity_name: &str,
1419    field: &PersistedFieldSnapshot,
1420) -> Result<(), SqlDdlBindError> {
1421    if field.generated() {
1422        return Err(SqlDdlBindError::GeneratedFieldNullabilityChangeRejected {
1423            entity_name: entity_name.to_string(),
1424            column_name: field.name().to_string(),
1425        });
1426    }
1427
1428    Ok(())
1429}
1430
1431fn schema_field_default_for_sql_default(
1432    entity_name: &str,
1433    column_name: &str,
1434    default: Option<&crate::value::Value>,
1435    kind: &PersistedFieldKind,
1436    nullable: bool,
1437    storage_decode: FieldStorageDecode,
1438    leaf_codec: LeafCodec,
1439) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1440    let Some(default) = default else {
1441        return Ok(SchemaFieldDefault::None);
1442    };
1443    if matches!(default, crate::value::Value::Null) {
1444        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1445            entity_name: entity_name.to_string(),
1446            column_name: column_name.to_string(),
1447            detail: "NULL cannot be used as an accepted database default".to_string(),
1448        });
1449    }
1450
1451    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
1452        .unwrap_or_else(|| default.clone());
1453    let contract =
1454        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
1455    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1456        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1457            entity_name: entity_name.to_string(),
1458            column_name: column_name.to_string(),
1459            detail: error.to_string(),
1460        },
1461    )?;
1462
1463    Ok(SchemaFieldDefault::SlotPayload(payload))
1464}
1465
1466fn schema_field_default_for_alter_column_default(
1467    entity_name: &str,
1468    field: &PersistedFieldSnapshot,
1469    default: &crate::value::Value,
1470) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1471    if matches!(default, crate::value::Value::Null) {
1472        return Err(SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1473            entity_name: entity_name.to_string(),
1474            column_name: field.name().to_string(),
1475            detail: "NULL cannot be used as an accepted database default".to_string(),
1476        });
1477    }
1478
1479    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(field.kind(), default)
1480        .unwrap_or_else(|| default.clone());
1481    let contract = AcceptedFieldDecodeContract::new(
1482        field.name(),
1483        field.kind(),
1484        field.nullable(),
1485        field.storage_decode(),
1486        field.leaf_codec(),
1487    );
1488    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1489        |error| SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1490            entity_name: entity_name.to_string(),
1491            column_name: field.name().to_string(),
1492            detail: error.to_string(),
1493        },
1494    )?;
1495
1496    Ok(SchemaFieldDefault::SlotPayload(payload))
1497}
1498
1499fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
1500    let next = accepted_before
1501        .persisted_snapshot()
1502        .fields()
1503        .iter()
1504        .map(|field| field.id().get())
1505        .max()
1506        .unwrap_or(0)
1507        .checked_add(1)
1508        .expect("accepted field IDs should not be exhausted");
1509
1510    FieldId::new(next)
1511}
1512
1513fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
1514    let next = accepted_before
1515        .persisted_snapshot()
1516        .row_layout()
1517        .field_to_slot()
1518        .iter()
1519        .map(|(_, slot)| slot.get())
1520        .max()
1521        .unwrap_or(0)
1522        .checked_add(1)
1523        .expect("accepted row slots should not be exhausted");
1524
1525    SchemaFieldSlot::new(next)
1526}
1527
1528fn persisted_field_contract_for_sql_column_type(
1529    column_type: &str,
1530) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
1531    let normalized = column_type.trim().to_ascii_lowercase();
1532    match normalized.as_str() {
1533        "bool" | "boolean" => Some((
1534            PersistedFieldKind::Bool,
1535            FieldStorageDecode::ByKind,
1536            LeafCodec::Scalar(ScalarCodec::Bool),
1537        )),
1538        "int" | "integer" => Some((
1539            PersistedFieldKind::Int,
1540            FieldStorageDecode::ByKind,
1541            LeafCodec::Scalar(ScalarCodec::Int64),
1542        )),
1543        "nat" | "natural" => Some((
1544            PersistedFieldKind::Nat,
1545            FieldStorageDecode::ByKind,
1546            LeafCodec::Scalar(ScalarCodec::Nat64),
1547        )),
1548        "text" | "string" => Some((
1549            PersistedFieldKind::Text { max_len: None },
1550            FieldStorageDecode::ByKind,
1551            LeafCodec::Scalar(ScalarCodec::Text),
1552        )),
1553        _ => None,
1554    }
1555}
1556
1557#[derive(Clone, Debug, Eq, PartialEq)]
1558pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
1559    FieldPath(BoundSqlDdlFieldPath),
1560    Expression(BoundSqlDdlExpressionKey),
1561}
1562
1563///
1564/// BoundSqlDdlExpressionKey
1565///
1566/// Accepted expression-index key target for SQL DDL binding.
1567///
1568#[derive(Clone, Debug, Eq, PartialEq)]
1569pub(in crate::db) struct BoundSqlDdlExpressionKey {
1570    op: PersistedIndexExpressionOp,
1571    source: BoundSqlDdlFieldPath,
1572    canonical_sql: String,
1573}
1574
1575impl BoundSqlDdlExpressionKey {
1576    /// Return the accepted expression operation.
1577    #[must_use]
1578    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1579        self.op
1580    }
1581
1582    /// Borrow the accepted source field path.
1583    #[must_use]
1584    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1585        &self.source
1586    }
1587
1588    /// Borrow the SQL-facing canonical expression text.
1589    #[must_use]
1590    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1591        self.canonical_sql.as_str()
1592    }
1593}
1594
1595fn bind_create_index_key_item(
1596    key_item: &SqlCreateIndexKeyItem,
1597    entity_name: &str,
1598    schema: &SchemaInfo,
1599) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1600    match key_item {
1601        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1602            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1603                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1604        }
1605        SqlCreateIndexKeyItem::Expression(expression) => {
1606            bind_create_index_expression_key(expression, entity_name, schema)
1607        }
1608    }
1609}
1610
1611fn bind_create_index_expression_key(
1612    expression: &SqlCreateIndexExpressionKey,
1613    entity_name: &str,
1614    schema: &SchemaInfo,
1615) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1616    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1617
1618    Ok(BoundSqlDdlCreateIndexKey::Expression(
1619        BoundSqlDdlExpressionKey {
1620            op: expression_op_from_sql_function(expression.function),
1621            source,
1622            canonical_sql: expression.canonical_sql(),
1623        },
1624    ))
1625}
1626
1627const fn expression_op_from_sql_function(
1628    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1629) -> PersistedIndexExpressionOp {
1630    match function {
1631        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1632            PersistedIndexExpressionOp::Lower
1633        }
1634        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1635            PersistedIndexExpressionOp::Upper
1636        }
1637        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1638            PersistedIndexExpressionOp::Trim
1639        }
1640    }
1641}
1642
1643fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1644    key_items
1645        .iter()
1646        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1647}
1648
1649fn create_index_field_path_report_items(
1650    key_items: &[BoundSqlDdlCreateIndexKey],
1651) -> Vec<BoundSqlDdlFieldPath> {
1652    key_items
1653        .iter()
1654        .map(|key_item| match key_item {
1655            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1656            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1657        })
1658        .collect()
1659}
1660
1661fn bind_create_index_field_path(
1662    field_path: &str,
1663    entity_name: &str,
1664    schema: &SchemaInfo,
1665) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1666    let mut path = field_path
1667        .split('.')
1668        .map(str::trim)
1669        .filter(|segment| !segment.is_empty());
1670    let Some(root) = path.next() else {
1671        return Err(SqlDdlBindError::UnknownFieldPath {
1672            entity_name: entity_name.to_string(),
1673            field_path: field_path.to_string(),
1674        });
1675    };
1676    let segments = path.map(str::to_string).collect::<Vec<_>>();
1677
1678    let capabilities = if segments.is_empty() {
1679        schema.sql_capabilities(root)
1680    } else {
1681        schema.nested_sql_capabilities(root, segments.as_slice())
1682    }
1683    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1684        entity_name: entity_name.to_string(),
1685        field_path: field_path.to_string(),
1686    })?;
1687
1688    if !capabilities.orderable() {
1689        return Err(SqlDdlBindError::FieldPathNotIndexable {
1690            field_path: field_path.to_string(),
1691        });
1692    }
1693
1694    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1695    accepted_path.push(root.to_string());
1696    accepted_path.extend(segments.iter().cloned());
1697
1698    Ok(BoundSqlDdlFieldPath {
1699        root: root.to_string(),
1700        segments,
1701        accepted_path,
1702    })
1703}
1704
1705fn find_field_path_index_by_name<'a>(
1706    schema: &'a SchemaInfo,
1707    index_name: &str,
1708) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1709    schema
1710        .field_path_indexes()
1711        .iter()
1712        .find(|index| index.name() == index_name)
1713}
1714
1715fn existing_field_path_index_matches_request(
1716    index: &crate::db::schema::SchemaIndexInfo,
1717    field_paths: &[BoundSqlDdlFieldPath],
1718    predicate_sql: Option<&str>,
1719    uniqueness: SqlCreateIndexUniqueness,
1720) -> bool {
1721    let fields = index.fields();
1722
1723    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1724        && index.predicate_sql() == predicate_sql
1725        && fields.len() == field_paths.len()
1726        && fields
1727            .iter()
1728            .zip(field_paths)
1729            .all(|(field, requested)| field.path() == requested.accepted_path())
1730}
1731
1732fn find_expression_index_by_name<'a>(
1733    schema: &'a SchemaInfo,
1734    index_name: &str,
1735) -> Option<&'a SchemaExpressionIndexInfo> {
1736    schema
1737        .expression_indexes()
1738        .iter()
1739        .find(|index| index.name() == index_name)
1740}
1741
1742fn existing_expression_index_matches_request(
1743    index: &SchemaExpressionIndexInfo,
1744    key_items: &[BoundSqlDdlCreateIndexKey],
1745    predicate_sql: Option<&str>,
1746    uniqueness: SqlCreateIndexUniqueness,
1747) -> bool {
1748    let existing_key_items = index.key_items();
1749
1750    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1751        && index.predicate_sql() == predicate_sql
1752        && existing_key_items.len() == key_items.len()
1753        && existing_key_items
1754            .iter()
1755            .zip(key_items)
1756            .all(existing_expression_key_item_matches_request)
1757}
1758
1759fn existing_expression_key_item_matches_request(
1760    existing: (
1761        &SchemaExpressionIndexKeyItemInfo,
1762        &BoundSqlDdlCreateIndexKey,
1763    ),
1764) -> bool {
1765    let (existing, requested) = existing;
1766    match (existing, requested) {
1767        (
1768            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1769            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1770        ) => existing.path() == requested.accepted_path(),
1771        (
1772            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1773            BoundSqlDdlCreateIndexKey::Expression(requested),
1774        ) => existing_expression_component_matches_request(
1775            existing.op(),
1776            existing.source().path(),
1777            existing.canonical_text(),
1778            requested,
1779        ),
1780        _ => false,
1781    }
1782}
1783
1784fn existing_expression_component_matches_request(
1785    existing_op: PersistedIndexExpressionOp,
1786    existing_path: &[String],
1787    existing_canonical_text: &str,
1788    requested: &BoundSqlDdlExpressionKey,
1789) -> bool {
1790    let requested_path = requested.source().accepted_path();
1791    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1792
1793    existing_op == requested.op()
1794        && existing_path == requested_path
1795        && existing_canonical_text == requested_canonical_text
1796}
1797
1798fn reject_duplicate_expression_index(
1799    key_items: &[BoundSqlDdlCreateIndexKey],
1800    predicate_sql: Option<&str>,
1801    schema: &SchemaInfo,
1802) -> Result<(), SqlDdlBindError> {
1803    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1804        existing_expression_index_matches_request(
1805            index,
1806            key_items,
1807            predicate_sql,
1808            if index.unique() {
1809                SqlCreateIndexUniqueness::Unique
1810            } else {
1811                SqlCreateIndexUniqueness::NonUnique
1812            },
1813        )
1814    }) else {
1815        return Ok(());
1816    };
1817
1818    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1819        field_path: ddl_key_item_report(key_items).join(","),
1820        existing_index: existing_index.name().to_string(),
1821    })
1822}
1823
1824fn reject_duplicate_field_path_index(
1825    field_paths: &[BoundSqlDdlFieldPath],
1826    predicate_sql: Option<&str>,
1827    schema: &SchemaInfo,
1828) -> Result<(), SqlDdlBindError> {
1829    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1830        let fields = index.fields();
1831        index.predicate_sql() == predicate_sql
1832            && fields.len() == field_paths.len()
1833            && fields
1834                .iter()
1835                .zip(field_paths)
1836                .all(|(field, requested)| field.path() == requested.accepted_path())
1837    }) else {
1838        return Ok(());
1839    };
1840
1841    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1842        field_path: ddl_field_path_report(field_paths).join(","),
1843        existing_index: existing_index.name().to_string(),
1844    })
1845}
1846
1847fn candidate_index_snapshot(
1848    index_name: &str,
1849    key_items: &[BoundSqlDdlCreateIndexKey],
1850    predicate_sql: Option<&str>,
1851    uniqueness: SqlCreateIndexUniqueness,
1852    schema: &SchemaInfo,
1853    index_store_path: &'static str,
1854) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1855    let key = if key_items_are_field_path_only(key_items) {
1856        PersistedIndexKeySnapshot::FieldPath(
1857            key_items
1858                .iter()
1859                .map(|key_item| {
1860                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1861                        unreachable!("field-path-only index checked before field-path lowering");
1862                    };
1863
1864                    accepted_index_field_path_snapshot(schema, field_path)
1865                })
1866                .collect::<Result<Vec<_>, _>>()?,
1867        )
1868    } else {
1869        PersistedIndexKeySnapshot::Items(
1870            key_items
1871                .iter()
1872                .map(|key_item| match key_item {
1873                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1874                        accepted_index_field_path_snapshot(schema, field_path)
1875                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1876                    }
1877                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1878                        accepted_index_expression_snapshot(schema, expression)
1879                    }
1880                })
1881                .collect::<Result<Vec<_>, _>>()?,
1882        )
1883    };
1884
1885    Ok(PersistedIndexSnapshot::new_sql_ddl(
1886        schema.next_secondary_index_ordinal(),
1887        index_name.to_string(),
1888        index_store_path.to_string(),
1889        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1890        key,
1891        predicate_sql.map(str::to_string),
1892    ))
1893}
1894
1895fn accepted_index_field_path_snapshot(
1896    schema: &SchemaInfo,
1897    field_path: &BoundSqlDdlFieldPath,
1898) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1899    schema
1900        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1901        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1902            field_path: field_path.accepted_path().join("."),
1903        })
1904}
1905
1906fn accepted_index_expression_snapshot(
1907    schema: &SchemaInfo,
1908    expression: &BoundSqlDdlExpressionKey,
1909) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1910    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1911    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1912        return Err(SqlDdlBindError::FieldPathNotIndexable {
1913            field_path: expression.source().accepted_path().join("."),
1914        });
1915    };
1916
1917    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1918        PersistedIndexExpressionSnapshot::new(
1919            expression.op(),
1920            source.clone(),
1921            source.kind().clone(),
1922            output_kind,
1923            format!("expr:v1:{}", expression.canonical_sql()),
1924        ),
1925    )))
1926}
1927
1928fn expression_output_kind(
1929    op: PersistedIndexExpressionOp,
1930    source_kind: &PersistedFieldKind,
1931) -> Option<PersistedFieldKind> {
1932    match op {
1933        PersistedIndexExpressionOp::Lower
1934        | PersistedIndexExpressionOp::Upper
1935        | PersistedIndexExpressionOp::Trim
1936        | PersistedIndexExpressionOp::LowerTrim => {
1937            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1938                Some(source_kind.clone())
1939            } else {
1940                None
1941            }
1942        }
1943        PersistedIndexExpressionOp::Date => {
1944            if matches!(
1945                source_kind,
1946                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1947            ) {
1948                Some(PersistedFieldKind::Date)
1949            } else {
1950                None
1951            }
1952        }
1953        PersistedIndexExpressionOp::Year
1954        | PersistedIndexExpressionOp::Month
1955        | PersistedIndexExpressionOp::Day => {
1956            if matches!(
1957                source_kind,
1958                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1959            ) {
1960                Some(PersistedFieldKind::Int)
1961            } else {
1962                None
1963            }
1964        }
1965    }
1966}
1967
1968fn validated_create_index_predicate_sql(
1969    predicate_sql: Option<&str>,
1970    schema: &SchemaInfo,
1971) -> Result<Option<String>, SqlDdlBindError> {
1972    let Some(predicate_sql) = predicate_sql else {
1973        return Ok(None);
1974    };
1975    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1976        SqlDdlBindError::InvalidFilteredIndexPredicate {
1977            detail: error.to_string(),
1978        }
1979    })?;
1980    validate_predicate(schema, &predicate).map_err(|error| {
1981        SqlDdlBindError::InvalidFilteredIndexPredicate {
1982            detail: error.to_string(),
1983        }
1984    })?;
1985
1986    Ok(Some(predicate_sql.to_string()))
1987}
1988
1989fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1990    match field_paths {
1991        [field_path] => field_path.accepted_path().to_vec(),
1992        _ => vec![
1993            field_paths
1994                .iter()
1995                .map(|field_path| field_path.accepted_path().join("."))
1996                .collect::<Vec<_>>()
1997                .join(","),
1998        ],
1999    }
2000}
2001
2002fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
2003    match key_items {
2004        [key_item] => vec![ddl_key_item_text(key_item)],
2005        _ => vec![
2006            key_items
2007                .iter()
2008                .map(ddl_key_item_text)
2009                .collect::<Vec<_>>()
2010                .join(","),
2011        ],
2012    }
2013}
2014
2015fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
2016    match key_item {
2017        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
2018        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
2019    }
2020}
2021
2022/// Lower one bound SQL DDL request through schema mutation admission.
2023pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
2024    request: &BoundSqlDdlRequest,
2025) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
2026    match request.statement() {
2027        BoundSqlDdlStatement::AddColumn(add) => {
2028            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
2029        }
2030        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
2031            Ok(admit_sql_ddl_field_default_candidate(alter.field()))
2032        }
2033        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
2034            Ok(admit_sql_ddl_field_nullability_candidate(alter.field()))
2035        }
2036        BoundSqlDdlStatement::RenameColumn(rename) => {
2037            let after = rename
2038                .field()
2039                .clone_with_name(rename.new_name().to_string());
2040            Ok(admit_sql_ddl_field_rename_candidate(rename.field(), &after))
2041        }
2042        BoundSqlDdlStatement::CreateIndex(create) => {
2043            if create.candidate_index().key().is_field_path_only() {
2044                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
2045            } else {
2046                admit_sql_ddl_expression_index_candidate(create.candidate_index())
2047            }
2048        }
2049        BoundSqlDdlStatement::DropIndex(drop) => {
2050            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
2051        }
2052        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
2053    }
2054    .map_err(SqlDdlLoweringError::MutationAdmission)
2055}
2056
2057/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
2058pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
2059    accepted_before: &AcceptedSchemaSnapshot,
2060    request: &BoundSqlDdlRequest,
2061) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
2062    match request.statement() {
2063        BoundSqlDdlStatement::AddColumn(add) => {
2064            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
2065        }
2066        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
2067            derive_sql_ddl_field_default_accepted_after(
2068                accepted_before,
2069                alter.field_name(),
2070                alter.default().clone(),
2071            )
2072        }
2073        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
2074            derive_sql_ddl_field_nullability_accepted_after(
2075                accepted_before,
2076                alter.field_name(),
2077                alter.nullable(),
2078            )
2079        }
2080        BoundSqlDdlStatement::RenameColumn(rename) => derive_sql_ddl_field_rename_accepted_after(
2081            accepted_before,
2082            rename.old_name(),
2083            rename.new_name(),
2084        ),
2085        BoundSqlDdlStatement::CreateIndex(create) => {
2086            if create.candidate_index().key().is_field_path_only() {
2087                derive_sql_ddl_field_path_index_accepted_after(
2088                    accepted_before,
2089                    create.candidate_index().clone(),
2090                )
2091            } else {
2092                derive_sql_ddl_expression_index_accepted_after(
2093                    accepted_before,
2094                    create.candidate_index().clone(),
2095                )
2096            }
2097        }
2098        BoundSqlDdlStatement::DropIndex(drop) => {
2099            derive_sql_ddl_secondary_index_drop_accepted_after(
2100                accepted_before,
2101                drop.dropped_index(),
2102            )
2103        }
2104        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
2105    }
2106    .map_err(SqlDdlLoweringError::MutationAdmission)
2107}
2108
2109fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
2110    match bound.statement() {
2111        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
2112            mutation_kind: if add.field().default().is_none() {
2113                SqlDdlMutationKind::AddNullableField
2114            } else {
2115                SqlDdlMutationKind::AddDefaultedField
2116            },
2117            target_index: add.field().name().to_string(),
2118            target_store: add.entity_name().to_string(),
2119            field_path: vec![add.field().name().to_string()],
2120            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2121            rows_scanned: 0,
2122            index_keys_written: 0,
2123        },
2124        BoundSqlDdlStatement::AlterColumnDefault(alter) => SqlDdlPreparationReport {
2125            mutation_kind: alter.mutation_kind(),
2126            target_index: alter.field_name().to_string(),
2127            target_store: alter.entity_name().to_string(),
2128            field_path: vec![alter.field_name().to_string()],
2129            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2130            rows_scanned: 0,
2131            index_keys_written: 0,
2132        },
2133        BoundSqlDdlStatement::AlterColumnNullability(alter) => SqlDdlPreparationReport {
2134            mutation_kind: alter.mutation_kind(),
2135            target_index: alter.field_name().to_string(),
2136            target_store: alter.entity_name().to_string(),
2137            field_path: vec![alter.field_name().to_string()],
2138            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2139            rows_scanned: 0,
2140            index_keys_written: 0,
2141        },
2142        BoundSqlDdlStatement::RenameColumn(rename) => SqlDdlPreparationReport {
2143            mutation_kind: SqlDdlMutationKind::RenameField,
2144            target_index: rename.new_name().to_string(),
2145            target_store: rename.entity_name().to_string(),
2146            field_path: vec![rename.old_name().to_string(), rename.new_name().to_string()],
2147            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2148            rows_scanned: 0,
2149            index_keys_written: 0,
2150        },
2151        BoundSqlDdlStatement::CreateIndex(create) => {
2152            let target = create.candidate_index();
2153
2154            SqlDdlPreparationReport {
2155                mutation_kind: if target.key().is_field_path_only() {
2156                    SqlDdlMutationKind::AddFieldPathIndex
2157                } else {
2158                    SqlDdlMutationKind::AddExpressionIndex
2159                },
2160                target_index: target.name().to_string(),
2161                target_store: target.store().to_string(),
2162                field_path: ddl_key_item_report(create.key_items()),
2163                execution_status: SqlDdlExecutionStatus::PreparedOnly,
2164                rows_scanned: 0,
2165                index_keys_written: 0,
2166            }
2167        }
2168        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
2169            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
2170            target_index: drop.index_name().to_string(),
2171            target_store: drop.dropped_index().store().to_string(),
2172            field_path: drop.field_path().to_vec(),
2173            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2174            rows_scanned: 0,
2175            index_keys_written: 0,
2176        },
2177        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
2178            mutation_kind: no_op.mutation_kind(),
2179            target_index: no_op.index_name().to_string(),
2180            target_store: no_op.target_store().to_string(),
2181            field_path: no_op.field_path().to_vec(),
2182            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2183            rows_scanned: 0,
2184            index_keys_written: 0,
2185        },
2186    }
2187}