Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
25        admit_sql_ddl_field_drop_candidate, admit_sql_ddl_field_nullability_candidate,
26        admit_sql_ddl_field_path_index_candidate, admit_sql_ddl_field_rename_candidate,
27        admit_sql_ddl_secondary_index_drop_candidate,
28        canonicalize_strict_sql_literal_for_persisted_kind,
29        derive_sql_ddl_expression_index_accepted_after,
30        derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
31        derive_sql_ddl_field_drop_accepted_after, derive_sql_ddl_field_nullability_accepted_after,
32        derive_sql_ddl_field_path_index_accepted_after, derive_sql_ddl_field_rename_accepted_after,
33        derive_sql_ddl_secondary_index_drop_accepted_after,
34        resolve_sql_ddl_field_drop_dependent_index, resolve_sql_ddl_secondary_index_drop_candidate,
35    },
36    sql::{
37        identifier::identifiers_tail_match,
38        parser::{
39            SqlAlterColumnAction, SqlAlterTableAddColumnStatement,
40            SqlAlterTableAlterColumnStatement, SqlAlterTableDropColumnStatement,
41            SqlAlterTableRenameColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
42            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
43            SqlDropIndexStatement, SqlStatement,
44        },
45    },
46};
47use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
48use thiserror::Error as ThisError;
49
50///
51/// PreparedSqlDdlCommand
52///
53/// Fully prepared SQL DDL command. This is intentionally not executable yet:
54/// it packages the accepted-catalog binding, accepted-after derivation, and
55/// schema mutation admission proof for the future execution boundary.
56///
57#[derive(Clone, Debug, Eq, PartialEq)]
58pub(in crate::db) struct PreparedSqlDdlCommand {
59    bound: BoundSqlDdlRequest,
60    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
61    report: SqlDdlPreparationReport,
62}
63
64impl PreparedSqlDdlCommand {
65    /// Borrow the accepted-catalog-bound DDL request.
66    #[must_use]
67    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
68        &self.bound
69    }
70
71    /// Borrow the accepted-after derivation proof.
72    #[must_use]
73    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
74        self.derivation.as_ref()
75    }
76
77    /// Borrow the developer-facing preparation report.
78    #[must_use]
79    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
80        &self.report
81    }
82
83    /// Return whether this prepared command needs schema or storage mutation.
84    #[must_use]
85    pub(in crate::db) const fn mutates_schema(&self) -> bool {
86        self.derivation.is_some()
87    }
88}
89
90///
91/// SqlDdlPreparationReport
92///
93/// Compact report for a DDL command that has passed all pre-execution
94/// frontend and schema-mutation checks.
95///
96#[derive(Clone, Debug, Eq, PartialEq)]
97pub struct SqlDdlPreparationReport {
98    mutation_kind: SqlDdlMutationKind,
99    target_index: String,
100    target_store: String,
101    field_path: Vec<String>,
102    execution_status: SqlDdlExecutionStatus,
103    rows_scanned: usize,
104    index_keys_written: usize,
105}
106
107impl SqlDdlPreparationReport {
108    /// Return the prepared DDL mutation kind.
109    #[must_use]
110    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
111        self.mutation_kind
112    }
113
114    /// Borrow the target accepted index name.
115    #[must_use]
116    pub const fn target_index(&self) -> &str {
117        self.target_index.as_str()
118    }
119
120    /// Borrow the target accepted index store path.
121    #[must_use]
122    pub const fn target_store(&self) -> &str {
123        self.target_store.as_str()
124    }
125
126    /// Borrow the target field path.
127    #[must_use]
128    pub const fn field_path(&self) -> &[String] {
129        self.field_path.as_slice()
130    }
131
132    /// Return the execution status captured by this DDL report.
133    #[must_use]
134    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
135        self.execution_status
136    }
137
138    /// Return rows scanned by DDL execution.
139    #[must_use]
140    pub const fn rows_scanned(&self) -> usize {
141        self.rows_scanned
142    }
143
144    /// Return index keys written by DDL execution.
145    #[must_use]
146    pub const fn index_keys_written(&self) -> usize {
147        self.index_keys_written
148    }
149
150    pub(in crate::db) const fn with_execution_status(
151        mut self,
152        execution_status: SqlDdlExecutionStatus,
153    ) -> Self {
154        self.execution_status = execution_status;
155        self
156    }
157
158    pub(in crate::db) const fn with_execution_metrics(
159        mut self,
160        rows_scanned: usize,
161        index_keys_written: usize,
162    ) -> Self {
163        self.rows_scanned = rows_scanned;
164        self.index_keys_written = index_keys_written;
165        self
166    }
167}
168
169///
170/// SqlDdlMutationKind
171///
172/// Developer-facing SQL DDL mutation kind.
173///
174#[derive(Clone, Copy, Debug, Eq, PartialEq)]
175pub enum SqlDdlMutationKind {
176    AddDefaultedField,
177    AddNullableField,
178    SetFieldDefault,
179    DropFieldDefault,
180    SetFieldNotNull,
181    DropFieldNotNull,
182    DropField,
183    RenameField,
184    AddFieldPathIndex,
185    AddExpressionIndex,
186    DropSecondaryIndex,
187}
188
189impl SqlDdlMutationKind {
190    /// Return the stable diagnostic label for this DDL mutation kind.
191    #[must_use]
192    pub const fn as_str(self) -> &'static str {
193        match self {
194            Self::AddDefaultedField => "add_defaulted_field",
195            Self::AddNullableField => "add_nullable_field",
196            Self::SetFieldDefault => "set_field_default",
197            Self::DropFieldDefault => "drop_field_default",
198            Self::SetFieldNotNull => "set_field_not_null",
199            Self::DropFieldNotNull => "drop_field_not_null",
200            Self::DropField => "drop_field",
201            Self::RenameField => "rename_field",
202            Self::AddFieldPathIndex => "add_field_path_index",
203            Self::AddExpressionIndex => "add_expression_index",
204            Self::DropSecondaryIndex => "drop_secondary_index",
205        }
206    }
207}
208
209///
210/// SqlDdlExecutionStatus
211///
212/// SQL DDL execution state at the current boundary.
213///
214#[derive(Clone, Copy, Debug, Eq, PartialEq)]
215pub enum SqlDdlExecutionStatus {
216    PreparedOnly,
217    Published,
218    NoOp,
219}
220
221impl SqlDdlExecutionStatus {
222    /// Return the stable diagnostic label for this execution status.
223    #[must_use]
224    pub const fn as_str(self) -> &'static str {
225        match self {
226            Self::PreparedOnly => "prepared_only",
227            Self::Published => "published",
228            Self::NoOp => "no_op",
229        }
230    }
231}
232
233///
234/// BoundSqlDdlRequest
235///
236/// Accepted-catalog SQL DDL request after parser syntax has been resolved
237/// against one runtime schema snapshot.
238///
239#[derive(Clone, Debug, Eq, PartialEq)]
240pub(in crate::db) struct BoundSqlDdlRequest {
241    statement: BoundSqlDdlStatement,
242}
243
244impl BoundSqlDdlRequest {
245    /// Borrow the bound statement payload.
246    #[must_use]
247    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
248        &self.statement
249    }
250}
251
252///
253/// BoundSqlDdlStatement
254///
255/// Catalog-resolved DDL statement vocabulary.
256///
257#[derive(Clone, Debug, Eq, PartialEq)]
258pub(in crate::db) enum BoundSqlDdlStatement {
259    AddColumn(BoundSqlAddColumnRequest),
260    AlterColumnDefault(BoundSqlAlterColumnDefaultRequest),
261    AlterColumnNullability(BoundSqlAlterColumnNullabilityRequest),
262    DropColumn(BoundSqlDropColumnRequest),
263    RenameColumn(BoundSqlRenameColumnRequest),
264    CreateIndex(BoundSqlCreateIndexRequest),
265    DropIndex(BoundSqlDropIndexRequest),
266    NoOp(BoundSqlDdlNoOpRequest),
267}
268
269///
270/// BoundSqlAddColumnRequest
271///
272/// Catalog-resolved additive field DDL request.
273///
274#[derive(Clone, Debug, Eq, PartialEq)]
275pub(in crate::db) struct BoundSqlAddColumnRequest {
276    entity_name: String,
277    field: PersistedFieldSnapshot,
278}
279
280impl BoundSqlAddColumnRequest {
281    /// Borrow the accepted entity name.
282    #[must_use]
283    pub(in crate::db) const fn entity_name(&self) -> &str {
284        self.entity_name.as_str()
285    }
286
287    /// Borrow the accepted DDL-owned field snapshot to publish.
288    #[must_use]
289    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
290        &self.field
291    }
292}
293
294///
295/// BoundSqlAlterColumnDefaultRequest
296///
297/// Catalog-resolved field-default metadata DDL request.
298///
299#[derive(Clone, Debug, Eq, PartialEq)]
300pub(in crate::db) struct BoundSqlAlterColumnDefaultRequest {
301    entity_name: String,
302    field: PersistedFieldSnapshot,
303    default: SchemaFieldDefault,
304    mutation_kind: SqlDdlMutationKind,
305}
306
307impl BoundSqlAlterColumnDefaultRequest {
308    /// Borrow the accepted entity name.
309    #[must_use]
310    pub(in crate::db) const fn entity_name(&self) -> &str {
311        self.entity_name.as_str()
312    }
313
314    /// Borrow the accepted field name.
315    #[must_use]
316    pub(in crate::db) const fn field_name(&self) -> &str {
317        self.field.name()
318    }
319
320    /// Borrow the accepted field whose default will change.
321    #[must_use]
322    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
323        &self.field
324    }
325
326    /// Borrow the default contract to publish.
327    #[must_use]
328    pub(in crate::db) const fn default(&self) -> &SchemaFieldDefault {
329        &self.default
330    }
331
332    /// Return the field-default mutation kind.
333    #[must_use]
334    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
335        self.mutation_kind
336    }
337}
338
339///
340/// BoundSqlAlterColumnNullabilityRequest
341///
342/// Catalog-resolved field-nullability metadata DDL request.
343///
344#[derive(Clone, Debug, Eq, PartialEq)]
345pub(in crate::db) struct BoundSqlAlterColumnNullabilityRequest {
346    entity_name: String,
347    field: PersistedFieldSnapshot,
348    nullable: bool,
349    mutation_kind: SqlDdlMutationKind,
350}
351
352impl BoundSqlAlterColumnNullabilityRequest {
353    /// Borrow the accepted entity name.
354    #[must_use]
355    pub(in crate::db) const fn entity_name(&self) -> &str {
356        self.entity_name.as_str()
357    }
358
359    /// Borrow the accepted field name.
360    #[must_use]
361    pub(in crate::db) const fn field_name(&self) -> &str {
362        self.field.name()
363    }
364
365    /// Borrow the accepted field whose nullability will change.
366    #[must_use]
367    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
368        &self.field
369    }
370
371    /// Return the nullable contract to publish.
372    #[must_use]
373    pub(in crate::db) const fn nullable(&self) -> bool {
374        self.nullable
375    }
376
377    /// Return the field-nullability mutation kind.
378    #[must_use]
379    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
380        self.mutation_kind
381    }
382}
383
384///
385/// BoundSqlDropColumnRequest
386///
387/// Catalog-resolved retained-slot field removal DDL request.
388///
389#[derive(Clone, Debug, Eq, PartialEq)]
390pub(in crate::db) struct BoundSqlDropColumnRequest {
391    entity_name: String,
392    field: PersistedFieldSnapshot,
393}
394
395impl BoundSqlDropColumnRequest {
396    /// Borrow the accepted entity name.
397    #[must_use]
398    pub(in crate::db) const fn entity_name(&self) -> &str {
399        self.entity_name.as_str()
400    }
401
402    /// Borrow the accepted field name.
403    #[must_use]
404    pub(in crate::db) const fn field_name(&self) -> &str {
405        self.field.name()
406    }
407
408    /// Borrow the accepted DDL-owned field that will be retired.
409    #[must_use]
410    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
411        &self.field
412    }
413}
414
415///
416/// BoundSqlRenameColumnRequest
417///
418/// Catalog-resolved field-rename metadata DDL request.
419///
420#[derive(Clone, Debug, Eq, PartialEq)]
421pub(in crate::db) struct BoundSqlRenameColumnRequest {
422    entity_name: String,
423    field: PersistedFieldSnapshot,
424    new_name: String,
425}
426
427impl BoundSqlRenameColumnRequest {
428    /// Borrow the accepted entity name.
429    #[must_use]
430    pub(in crate::db) const fn entity_name(&self) -> &str {
431        self.entity_name.as_str()
432    }
433
434    /// Borrow the accepted source field name.
435    #[must_use]
436    pub(in crate::db) const fn old_name(&self) -> &str {
437        self.field.name()
438    }
439
440    /// Borrow the accepted target field name.
441    #[must_use]
442    pub(in crate::db) const fn new_name(&self) -> &str {
443        self.new_name.as_str()
444    }
445
446    /// Borrow the accepted source field.
447    #[must_use]
448    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
449        &self.field
450    }
451}
452
453///
454/// BoundSqlDdlNoOpRequest
455///
456/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
457///
458#[derive(Clone, Debug, Eq, PartialEq)]
459pub(in crate::db) struct BoundSqlDdlNoOpRequest {
460    mutation_kind: SqlDdlMutationKind,
461    index_name: String,
462    entity_name: String,
463    target_store: String,
464    field_path: Vec<String>,
465}
466
467impl BoundSqlDdlNoOpRequest {
468    /// Return the user-facing mutation family this no-op belongs to.
469    #[must_use]
470    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
471        self.mutation_kind
472    }
473
474    /// Borrow the requested index name.
475    #[must_use]
476    pub(in crate::db) const fn index_name(&self) -> &str {
477        self.index_name.as_str()
478    }
479
480    /// Borrow the accepted entity name that owns this request.
481    #[must_use]
482    pub(in crate::db) const fn entity_name(&self) -> &str {
483        self.entity_name.as_str()
484    }
485
486    /// Borrow the accepted index store path, or `-` when no target exists.
487    #[must_use]
488    pub(in crate::db) const fn target_store(&self) -> &str {
489        self.target_store.as_str()
490    }
491
492    /// Borrow the target field path, empty when no target exists.
493    #[must_use]
494    pub(in crate::db) const fn field_path(&self) -> &[String] {
495        self.field_path.as_slice()
496    }
497}
498
499///
500/// BoundSqlCreateIndexRequest
501///
502/// Catalog-resolved request for adding one secondary index.
503///
504#[derive(Clone, Debug, Eq, PartialEq)]
505pub(in crate::db) struct BoundSqlCreateIndexRequest {
506    index_name: String,
507    entity_name: String,
508    key_items: Vec<BoundSqlDdlCreateIndexKey>,
509    field_paths: Vec<BoundSqlDdlFieldPath>,
510    candidate_index: PersistedIndexSnapshot,
511}
512
513impl BoundSqlCreateIndexRequest {
514    /// Borrow the requested index name.
515    #[must_use]
516    pub(in crate::db) const fn index_name(&self) -> &str {
517        self.index_name.as_str()
518    }
519
520    /// Borrow the accepted entity name that owns this request.
521    #[must_use]
522    pub(in crate::db) const fn entity_name(&self) -> &str {
523        self.entity_name.as_str()
524    }
525
526    /// Borrow the accepted field-path targets.
527    #[must_use]
528    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
529        self.field_paths.as_slice()
530    }
531
532    /// Borrow the accepted key targets in DDL key order.
533    #[must_use]
534    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
535        self.key_items.as_slice()
536    }
537
538    /// Borrow the candidate accepted index snapshot for mutation admission.
539    #[must_use]
540    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
541        &self.candidate_index
542    }
543}
544
545///
546/// BoundSqlDropIndexRequest
547///
548/// Catalog-resolved request for dropping one DDL-published secondary index.
549///
550#[derive(Clone, Debug, Eq, PartialEq)]
551pub(in crate::db) struct BoundSqlDropIndexRequest {
552    index_name: String,
553    entity_name: String,
554    dropped_index: PersistedIndexSnapshot,
555    field_path: Vec<String>,
556}
557
558impl BoundSqlDropIndexRequest {
559    /// Borrow the requested index name.
560    #[must_use]
561    pub(in crate::db) const fn index_name(&self) -> &str {
562        self.index_name.as_str()
563    }
564
565    /// Borrow the accepted entity name that owns this request.
566    #[must_use]
567    pub(in crate::db) const fn entity_name(&self) -> &str {
568        self.entity_name.as_str()
569    }
570
571    /// Borrow the accepted index snapshot that will be removed.
572    #[must_use]
573    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
574        &self.dropped_index
575    }
576
577    /// Borrow the dropped field-path target.
578    #[must_use]
579    pub(in crate::db) const fn field_path(&self) -> &[String] {
580        self.field_path.as_slice()
581    }
582}
583
584///
585/// BoundSqlDdlFieldPath
586///
587/// Accepted field-path target for SQL DDL binding.
588///
589#[derive(Clone, Debug, Eq, PartialEq)]
590pub(in crate::db) struct BoundSqlDdlFieldPath {
591    root: String,
592    segments: Vec<String>,
593    accepted_path: Vec<String>,
594}
595
596impl BoundSqlDdlFieldPath {
597    /// Borrow the top-level field name.
598    #[must_use]
599    pub(in crate::db) const fn root(&self) -> &str {
600        self.root.as_str()
601    }
602
603    /// Borrow nested path segments below the top-level field.
604    #[must_use]
605    pub(in crate::db) const fn segments(&self) -> &[String] {
606        self.segments.as_slice()
607    }
608
609    /// Borrow the full accepted field path used by index metadata.
610    #[must_use]
611    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
612        self.accepted_path.as_slice()
613    }
614}
615
616///
617/// SqlDdlBindError
618///
619/// Typed fail-closed reasons for SQL DDL catalog binding.
620///
621#[derive(Debug, Eq, PartialEq, ThisError)]
622pub(in crate::db) enum SqlDdlBindError {
623    #[error("SQL DDL binder requires a DDL statement")]
624    NotDdl,
625
626    #[error("accepted schema does not expose an entity name")]
627    MissingEntityName,
628
629    #[error("accepted schema does not expose an entity path")]
630    MissingEntityPath,
631
632    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
633    EntityMismatch {
634        sql_entity: String,
635        expected_entity: String,
636    },
637
638    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
639    UnknownFieldPath {
640        entity_name: String,
641        field_path: String,
642    },
643
644    #[error("field path '{field_path}' is not indexable")]
645    FieldPathNotIndexable { field_path: String },
646
647    #[error("field path '{field_path}' depends on generated-only metadata")]
648    FieldPathNotAcceptedCatalogBacked { field_path: String },
649
650    #[error("invalid filtered index predicate: {detail}")]
651    InvalidFilteredIndexPredicate { detail: String },
652
653    #[error("index name '{index_name}' already exists in the accepted schema")]
654    DuplicateIndexName { index_name: String },
655
656    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
657    DuplicateFieldPathIndex {
658        field_path: String,
659        existing_index: String,
660    },
661
662    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
663    UnknownIndex {
664        entity_name: String,
665        index_name: String,
666    },
667
668    #[error(
669        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
670    )]
671    GeneratedIndexDropRejected { index_name: String },
672
673    #[error(
674        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
675    )]
676    UnsupportedDropIndex { index_name: String },
677
678    #[error(
679        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
680    )]
681    UnsupportedAlterTableAddColumn {
682        entity_name: String,
683        column_name: String,
684    },
685
686    #[error(
687        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
688    )]
689    InvalidAlterTableAddColumnDefault {
690        entity_name: String,
691        column_name: String,
692        detail: String,
693    },
694
695    #[error(
696        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
697    )]
698    UnsupportedAlterTableAddColumnNotNull {
699        entity_name: String,
700        column_name: String,
701    },
702
703    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
704    DuplicateColumn {
705        entity_name: String,
706        column_name: String,
707    },
708
709    #[error(
710        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
711    )]
712    UnsupportedAlterTableAddColumnType {
713        entity_name: String,
714        column_name: String,
715        column_type: String,
716    },
717
718    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
719    UnknownColumn {
720        entity_name: String,
721        column_name: String,
722    },
723
724    #[error(
725        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
726    )]
727    UnsupportedAlterTableAlterColumn {
728        entity_name: String,
729        column_name: String,
730        action: String,
731    },
732
733    #[error(
734        "SQL DDL ALTER TABLE ALTER COLUMN SET DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
735    )]
736    InvalidAlterTableAlterColumnDefault {
737        entity_name: String,
738        column_name: String,
739        detail: String,
740    },
741
742    #[error(
743        "SQL DDL ALTER TABLE ALTER COLUMN DROP DEFAULT is not executable yet for required accepted entity '{entity_name}' column '{column_name}'"
744    )]
745    UnsupportedAlterTableDropDefaultRequired {
746        entity_name: String,
747        column_name: String,
748    },
749
750    #[error(
751        "SQL DDL ALTER TABLE ALTER COLUMN DEFAULT cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema default instead"
752    )]
753    GeneratedFieldDefaultChangeRejected {
754        entity_name: String,
755        column_name: String,
756    },
757
758    #[error(
759        "SQL DDL ALTER TABLE ALTER COLUMN NULLABILITY cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema nullability instead"
760    )]
761    GeneratedFieldNullabilityChangeRejected {
762        entity_name: String,
763        column_name: String,
764    },
765
766    #[error(
767        "SQL DDL ALTER TABLE DROP COLUMN cannot drop primary-key field '{column_name}' on entity '{entity_name}'"
768    )]
769    PrimaryKeyFieldDropRejected {
770        entity_name: String,
771        column_name: String,
772    },
773
774    #[error(
775        "SQL DDL ALTER TABLE DROP COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; remove the field from the Rust schema instead"
776    )]
777    GeneratedFieldDropRejected {
778        entity_name: String,
779        column_name: String,
780    },
781
782    #[error(
783        "SQL DDL ALTER TABLE DROP COLUMN cannot drop accepted field '{column_name}' on entity '{entity_name}' while index '{index_name}' depends on it; drop dependent DDL-owned indexes first"
784    )]
785    IndexedFieldDropRejected {
786        entity_name: String,
787        column_name: String,
788        index_name: String,
789    },
790
791    #[error(
792        "SQL DDL ALTER TABLE RENAME COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; rename the field in the Rust schema instead"
793    )]
794    GeneratedFieldRenameRejected {
795        entity_name: String,
796        column_name: String,
797    },
798
799    #[error(
800        "SQL DDL ALTER TABLE RENAME COLUMN is not executable for accepted entity '{entity_name}' column '{old_column_name}' to '{new_column_name}'"
801    )]
802    UnsupportedAlterTableRenameColumn {
803        entity_name: String,
804        old_column_name: String,
805        new_column_name: String,
806    },
807}
808
809///
810/// SqlDdlLoweringError
811///
812/// Typed fail-closed reasons while lowering bound DDL into schema mutation
813/// admission.
814///
815#[derive(Debug, Eq, PartialEq, ThisError)]
816pub(in crate::db) enum SqlDdlLoweringError {
817    #[error("SQL DDL lowering requires a supported DDL statement")]
818    UnsupportedStatement,
819
820    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
821    MutationAdmission(SchemaDdlMutationAdmissionError),
822}
823
824///
825/// SqlDdlPrepareError
826///
827/// Typed fail-closed preparation errors for SQL DDL.
828///
829#[derive(Debug, Eq, PartialEq, ThisError)]
830pub(in crate::db) enum SqlDdlPrepareError {
831    #[error("{0}")]
832    Bind(#[from] SqlDdlBindError),
833
834    #[error("{0}")]
835    Lowering(#[from] SqlDdlLoweringError),
836}
837
838/// Prepare one parsed SQL DDL statement through every pre-execution proof.
839pub(in crate::db) fn prepare_sql_ddl_statement(
840    statement: &SqlStatement,
841    accepted_before: &AcceptedSchemaSnapshot,
842    schema: &SchemaInfo,
843    index_store_path: &'static str,
844) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
845    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
846    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
847        None
848    } else {
849        Some(derive_bound_sql_ddl_accepted_after(
850            accepted_before,
851            &bound,
852        )?)
853    };
854    let report = ddl_preparation_report(&bound);
855
856    Ok(PreparedSqlDdlCommand {
857        bound,
858        derivation,
859        report,
860    })
861}
862
863/// Bind one parsed SQL DDL statement against accepted catalog metadata.
864pub(in crate::db) fn bind_sql_ddl_statement(
865    statement: &SqlStatement,
866    accepted_before: &AcceptedSchemaSnapshot,
867    schema: &SchemaInfo,
868    index_store_path: &'static str,
869) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
870    let SqlStatement::Ddl(ddl) = statement else {
871        return Err(SqlDdlBindError::NotDdl);
872    };
873
874    match ddl {
875        SqlDdlStatement::CreateIndex(statement) => {
876            bind_create_index_statement(statement, schema, index_store_path)
877        }
878        SqlDdlStatement::DropIndex(statement) => {
879            bind_drop_index_statement(statement, accepted_before, schema)
880        }
881        SqlDdlStatement::AlterTableAddColumn(statement) => {
882            bind_alter_table_add_column_statement(statement, accepted_before, schema)
883        }
884        SqlDdlStatement::AlterTableAlterColumn(statement) => {
885            bind_alter_table_alter_column_statement(statement, accepted_before, schema)
886        }
887        SqlDdlStatement::AlterTableDropColumn(statement) => {
888            bind_alter_table_drop_column_statement(statement, accepted_before, schema)
889        }
890        SqlDdlStatement::AlterTableRenameColumn(statement) => {
891            bind_alter_table_rename_column_statement(statement, accepted_before, schema)
892        }
893    }
894}
895
896fn bind_create_index_statement(
897    statement: &SqlCreateIndexStatement,
898    schema: &SchemaInfo,
899    index_store_path: &'static str,
900) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
901    let entity_name = schema
902        .entity_name()
903        .ok_or(SqlDdlBindError::MissingEntityName)?;
904
905    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
906        return Err(SqlDdlBindError::EntityMismatch {
907            sql_entity: statement.entity.clone(),
908            expected_entity: entity_name.to_string(),
909        });
910    }
911
912    let key_items = statement
913        .key_items
914        .iter()
915        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
916        .collect::<Result<Vec<_>, _>>()?;
917    let field_paths = create_index_field_path_report_items(key_items.as_slice());
918    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
919        if key_items_are_field_path_only(key_items.as_slice())
920            && statement.if_not_exists
921            && existing_field_path_index_matches_request(
922                existing_index,
923                field_paths.as_slice(),
924                statement.predicate_sql.as_deref(),
925                statement.uniqueness,
926            )
927        {
928            return Ok(BoundSqlDdlRequest {
929                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
930                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
931                    index_name: statement.name.clone(),
932                    entity_name: entity_name.to_string(),
933                    target_store: existing_index.store().to_string(),
934                    field_path: ddl_field_path_report(field_paths.as_slice()),
935                }),
936            });
937        }
938
939        return Err(SqlDdlBindError::DuplicateIndexName {
940            index_name: statement.name.clone(),
941        });
942    }
943    let predicate_sql =
944        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
945    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
946        if statement.if_not_exists
947            && existing_expression_index_matches_request(
948                existing_index,
949                key_items.as_slice(),
950                predicate_sql.as_deref(),
951                statement.uniqueness,
952            )
953        {
954            return Ok(BoundSqlDdlRequest {
955                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
956                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
957                    index_name: statement.name.clone(),
958                    entity_name: entity_name.to_string(),
959                    target_store: existing_index.store().to_string(),
960                    field_path: ddl_key_item_report(key_items.as_slice()),
961                }),
962            });
963        }
964
965        return Err(SqlDdlBindError::DuplicateIndexName {
966            index_name: statement.name.clone(),
967        });
968    }
969    if key_items_are_field_path_only(key_items.as_slice()) {
970        reject_duplicate_field_path_index(
971            field_paths.as_slice(),
972            predicate_sql.as_deref(),
973            schema,
974        )?;
975    } else {
976        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
977    }
978    let candidate_index = candidate_index_snapshot(
979        statement.name.as_str(),
980        key_items.as_slice(),
981        predicate_sql.as_deref(),
982        statement.uniqueness,
983        schema,
984        index_store_path,
985    )?;
986
987    Ok(BoundSqlDdlRequest {
988        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
989            index_name: statement.name.clone(),
990            entity_name: entity_name.to_string(),
991            key_items,
992            field_paths,
993            candidate_index,
994        }),
995    })
996}
997
998fn bind_drop_index_statement(
999    statement: &SqlDropIndexStatement,
1000    accepted_before: &AcceptedSchemaSnapshot,
1001    schema: &SchemaInfo,
1002) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1003    let entity_name = schema
1004        .entity_name()
1005        .ok_or(SqlDdlBindError::MissingEntityName)?;
1006
1007    if let Some(sql_entity) = statement.entity.as_deref()
1008        && !identifiers_tail_match(sql_entity, entity_name)
1009    {
1010        return Err(SqlDdlBindError::EntityMismatch {
1011            sql_entity: sql_entity.to_string(),
1012            expected_entity: entity_name.to_string(),
1013        });
1014    }
1015    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
1016        accepted_before,
1017        &statement.name,
1018    )
1019    .map_err(|error| match error {
1020        SchemaDdlIndexDropCandidateError::Generated => {
1021            SqlDdlBindError::GeneratedIndexDropRejected {
1022                index_name: statement.name.clone(),
1023            }
1024        }
1025        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
1026            entity_name: entity_name.to_string(),
1027            index_name: statement.name.clone(),
1028        },
1029        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
1030            index_name: statement.name.clone(),
1031        },
1032    });
1033    let (dropped_index, field_path) = match drop_candidate {
1034        Ok((dropped_index, field_path)) => (dropped_index, field_path),
1035        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
1036            return Ok(BoundSqlDdlRequest {
1037                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1038                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1039                    index_name: statement.name.clone(),
1040                    entity_name: entity_name.to_string(),
1041                    target_store: "-".to_string(),
1042                    field_path: Vec::new(),
1043                }),
1044            });
1045        }
1046        Err(error) => return Err(error),
1047    };
1048    Ok(BoundSqlDdlRequest {
1049        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
1050            index_name: statement.name.clone(),
1051            entity_name: entity_name.to_string(),
1052            dropped_index,
1053            field_path,
1054        }),
1055    })
1056}
1057
1058fn bind_alter_table_add_column_statement(
1059    statement: &SqlAlterTableAddColumnStatement,
1060    accepted_before: &AcceptedSchemaSnapshot,
1061    schema: &SchemaInfo,
1062) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1063    let entity_name = schema
1064        .entity_name()
1065        .ok_or(SqlDdlBindError::MissingEntityName)?;
1066
1067    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1068        return Err(SqlDdlBindError::EntityMismatch {
1069            sql_entity: statement.entity.clone(),
1070            expected_entity: entity_name.to_string(),
1071        });
1072    }
1073
1074    if schema
1075        .field_nullable(statement.column_name.as_str())
1076        .is_some()
1077    {
1078        return Err(SqlDdlBindError::DuplicateColumn {
1079            entity_name: entity_name.to_string(),
1080            column_name: statement.column_name.clone(),
1081        });
1082    }
1083
1084    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
1085        statement.column_type.as_str(),
1086    )
1087    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
1088        entity_name: entity_name.to_string(),
1089        column_name: statement.column_name.clone(),
1090        column_type: statement.column_type.clone(),
1091    })?;
1092    let default = schema_field_default_for_sql_default(
1093        entity_name,
1094        statement.column_name.as_str(),
1095        statement.default.as_ref(),
1096        &kind,
1097        statement.nullable,
1098        storage_decode,
1099        leaf_codec,
1100    )?;
1101    if !statement.nullable && default.is_none() {
1102        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
1103            entity_name: entity_name.to_string(),
1104            column_name: statement.column_name.clone(),
1105        });
1106    }
1107    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
1108        next_sql_ddl_field_id(accepted_before),
1109        statement.column_name.clone(),
1110        next_sql_ddl_field_slot(accepted_before),
1111        kind,
1112        Vec::new(),
1113        statement.nullable,
1114        default,
1115        SchemaFieldWritePolicy::from_model_policies(None, None),
1116        PersistedFieldOrigin::SqlDdl,
1117        storage_decode,
1118        leaf_codec,
1119    );
1120
1121    Ok(BoundSqlDdlRequest {
1122        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
1123            entity_name: entity_name.to_string(),
1124            field,
1125        }),
1126    })
1127}
1128
1129fn alter_table_alter_column_bind_error(
1130    statement: &SqlAlterTableAlterColumnStatement,
1131    schema: &SchemaInfo,
1132) -> SqlDdlBindError {
1133    let Some(entity_name) = schema.entity_name() else {
1134        return SqlDdlBindError::MissingEntityName;
1135    };
1136
1137    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1138        return SqlDdlBindError::EntityMismatch {
1139            sql_entity: statement.entity.clone(),
1140            expected_entity: entity_name.to_string(),
1141        };
1142    }
1143
1144    if schema
1145        .field_nullable(statement.column_name.as_str())
1146        .is_none()
1147    {
1148        return SqlDdlBindError::UnknownColumn {
1149            entity_name: entity_name.to_string(),
1150            column_name: statement.column_name.clone(),
1151        };
1152    }
1153
1154    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
1155        entity_name: entity_name.to_string(),
1156        column_name: statement.column_name.clone(),
1157        action: statement.action.label().to_string(),
1158    }
1159}
1160
1161fn bind_alter_table_alter_column_statement(
1162    statement: &SqlAlterTableAlterColumnStatement,
1163    accepted_before: &AcceptedSchemaSnapshot,
1164    schema: &SchemaInfo,
1165) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1166    let Some(entity_name) = schema.entity_name() else {
1167        return Err(SqlDdlBindError::MissingEntityName);
1168    };
1169
1170    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1171        return Err(SqlDdlBindError::EntityMismatch {
1172            sql_entity: statement.entity.clone(),
1173            expected_entity: entity_name.to_string(),
1174        });
1175    }
1176
1177    let field = accepted_before
1178        .persisted_snapshot()
1179        .fields()
1180        .iter()
1181        .find(|field| field.name() == statement.column_name)
1182        .ok_or_else(|| SqlDdlBindError::UnknownColumn {
1183            entity_name: entity_name.to_string(),
1184            column_name: statement.column_name.clone(),
1185        })?;
1186
1187    match &statement.action {
1188        SqlAlterColumnAction::SetDefault(default) => {
1189            reject_generated_field_default_change(entity_name, field)?;
1190            let default =
1191                schema_field_default_for_alter_column_default(entity_name, field, default)?;
1192            Ok(bind_alter_table_alter_column_default(
1193                entity_name,
1194                field,
1195                default,
1196                SqlDdlMutationKind::SetFieldDefault,
1197            ))
1198        }
1199        SqlAlterColumnAction::DropDefault => {
1200            if !field.default().is_none() {
1201                reject_generated_field_default_change(entity_name, field)?;
1202            }
1203            if !field.nullable() && !field.default().is_none() {
1204                return Err(SqlDdlBindError::UnsupportedAlterTableDropDefaultRequired {
1205                    entity_name: entity_name.to_string(),
1206                    column_name: statement.column_name.clone(),
1207                });
1208            }
1209            Ok(bind_alter_table_alter_column_default(
1210                entity_name,
1211                field,
1212                SchemaFieldDefault::None,
1213                SqlDdlMutationKind::DropFieldDefault,
1214            ))
1215        }
1216        SqlAlterColumnAction::SetNotNull => Ok(bind_alter_table_alter_column_nullability(
1217            entity_name,
1218            field,
1219            false,
1220            SqlDdlMutationKind::SetFieldNotNull,
1221        )?),
1222        SqlAlterColumnAction::DropNotNull => Ok(bind_alter_table_alter_column_nullability(
1223            entity_name,
1224            field,
1225            true,
1226            SqlDdlMutationKind::DropFieldNotNull,
1227        )?),
1228    }
1229}
1230
1231fn bind_alter_table_drop_column_statement(
1232    statement: &SqlAlterTableDropColumnStatement,
1233    accepted_before: &AcceptedSchemaSnapshot,
1234    schema: &SchemaInfo,
1235) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1236    let entity_name = schema
1237        .entity_name()
1238        .ok_or(SqlDdlBindError::MissingEntityName)?;
1239
1240    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1241        return Err(SqlDdlBindError::EntityMismatch {
1242            sql_entity: statement.entity.clone(),
1243            expected_entity: entity_name.to_string(),
1244        });
1245    }
1246
1247    let accepted = accepted_before.persisted_snapshot();
1248    let Some(field) = accepted
1249        .fields()
1250        .iter()
1251        .find(|field| field.name() == statement.column_name)
1252    else {
1253        if statement.if_exists {
1254            return Ok(BoundSqlDdlRequest {
1255                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1256                    mutation_kind: SqlDdlMutationKind::DropField,
1257                    index_name: statement.column_name.clone(),
1258                    entity_name: entity_name.to_string(),
1259                    target_store: "-".to_string(),
1260                    field_path: vec![statement.column_name.clone()],
1261                }),
1262            });
1263        }
1264
1265        return Err(SqlDdlBindError::UnknownColumn {
1266            entity_name: entity_name.to_string(),
1267            column_name: statement.column_name.clone(),
1268        });
1269    };
1270
1271    if accepted.primary_key_field_id() == field.id() {
1272        return Err(SqlDdlBindError::PrimaryKeyFieldDropRejected {
1273            entity_name: entity_name.to_string(),
1274            column_name: statement.column_name.clone(),
1275        });
1276    }
1277
1278    if field.generated() {
1279        return Err(SqlDdlBindError::GeneratedFieldDropRejected {
1280            entity_name: entity_name.to_string(),
1281            column_name: statement.column_name.clone(),
1282        });
1283    }
1284
1285    if let Some(index_name) =
1286        resolve_sql_ddl_field_drop_dependent_index(accepted_before, field.id())
1287    {
1288        return Err(SqlDdlBindError::IndexedFieldDropRejected {
1289            entity_name: entity_name.to_string(),
1290            column_name: statement.column_name.clone(),
1291            index_name,
1292        });
1293    }
1294
1295    Ok(BoundSqlDdlRequest {
1296        statement: BoundSqlDdlStatement::DropColumn(BoundSqlDropColumnRequest {
1297            entity_name: entity_name.to_string(),
1298            field: field.clone(),
1299        }),
1300    })
1301}
1302
1303fn bind_alter_table_rename_column_statement(
1304    statement: &SqlAlterTableRenameColumnStatement,
1305    accepted_before: &AcceptedSchemaSnapshot,
1306    schema: &SchemaInfo,
1307) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1308    let entity_name = schema
1309        .entity_name()
1310        .ok_or(SqlDdlBindError::MissingEntityName)?;
1311
1312    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1313        return Err(SqlDdlBindError::EntityMismatch {
1314            sql_entity: statement.entity.clone(),
1315            expected_entity: entity_name.to_string(),
1316        });
1317    }
1318
1319    let accepted = accepted_before.persisted_snapshot();
1320    let Some(field) = accepted
1321        .fields()
1322        .iter()
1323        .find(|field| field.name() == statement.old_column_name)
1324    else {
1325        return Err(SqlDdlBindError::UnknownColumn {
1326            entity_name: entity_name.to_string(),
1327            column_name: statement.old_column_name.clone(),
1328        });
1329    };
1330
1331    if statement.old_column_name == statement.new_column_name {
1332        return Ok(BoundSqlDdlRequest {
1333            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1334                mutation_kind: SqlDdlMutationKind::RenameField,
1335                index_name: statement.old_column_name.clone(),
1336                entity_name: entity_name.to_string(),
1337                target_store: "-".to_string(),
1338                field_path: vec![statement.old_column_name.clone()],
1339            }),
1340        });
1341    }
1342
1343    if accepted
1344        .fields()
1345        .iter()
1346        .any(|field| field.name() == statement.new_column_name)
1347    {
1348        return Err(SqlDdlBindError::DuplicateColumn {
1349            entity_name: entity_name.to_string(),
1350            column_name: statement.new_column_name.clone(),
1351        });
1352    }
1353
1354    if field.generated() {
1355        return Err(SqlDdlBindError::GeneratedFieldRenameRejected {
1356            entity_name: entity_name.to_string(),
1357            column_name: statement.old_column_name.clone(),
1358        });
1359    }
1360
1361    Ok(BoundSqlDdlRequest {
1362        statement: BoundSqlDdlStatement::RenameColumn(BoundSqlRenameColumnRequest {
1363            entity_name: entity_name.to_string(),
1364            field: field.clone(),
1365            new_name: statement.new_column_name.clone(),
1366        }),
1367    })
1368}
1369
1370fn bind_alter_table_alter_column_default(
1371    entity_name: &str,
1372    field: &PersistedFieldSnapshot,
1373    default: SchemaFieldDefault,
1374    mutation_kind: SqlDdlMutationKind,
1375) -> BoundSqlDdlRequest {
1376    if field.default() == &default {
1377        return BoundSqlDdlRequest {
1378            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1379                mutation_kind,
1380                index_name: field.name().to_string(),
1381                entity_name: entity_name.to_string(),
1382                target_store: entity_name.to_string(),
1383                field_path: vec![field.name().to_string()],
1384            }),
1385        };
1386    }
1387
1388    BoundSqlDdlRequest {
1389        statement: BoundSqlDdlStatement::AlterColumnDefault(BoundSqlAlterColumnDefaultRequest {
1390            entity_name: entity_name.to_string(),
1391            field: field.clone(),
1392            default,
1393            mutation_kind,
1394        }),
1395    }
1396}
1397
1398fn reject_generated_field_default_change(
1399    entity_name: &str,
1400    field: &PersistedFieldSnapshot,
1401) -> Result<(), SqlDdlBindError> {
1402    if field.generated() {
1403        return Err(SqlDdlBindError::GeneratedFieldDefaultChangeRejected {
1404            entity_name: entity_name.to_string(),
1405            column_name: field.name().to_string(),
1406        });
1407    }
1408
1409    Ok(())
1410}
1411
1412fn bind_alter_table_alter_column_nullability(
1413    entity_name: &str,
1414    field: &PersistedFieldSnapshot,
1415    nullable: bool,
1416    mutation_kind: SqlDdlMutationKind,
1417) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1418    if field.nullable() == nullable {
1419        return Ok(BoundSqlDdlRequest {
1420            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1421                mutation_kind,
1422                index_name: field.name().to_string(),
1423                entity_name: entity_name.to_string(),
1424                target_store: entity_name.to_string(),
1425                field_path: vec![field.name().to_string()],
1426            }),
1427        });
1428    }
1429
1430    reject_generated_field_nullability_change(entity_name, field)?;
1431
1432    Ok(BoundSqlDdlRequest {
1433        statement: BoundSqlDdlStatement::AlterColumnNullability(
1434            BoundSqlAlterColumnNullabilityRequest {
1435                entity_name: entity_name.to_string(),
1436                field: field.clone(),
1437                nullable,
1438                mutation_kind,
1439            },
1440        ),
1441    })
1442}
1443
1444fn reject_generated_field_nullability_change(
1445    entity_name: &str,
1446    field: &PersistedFieldSnapshot,
1447) -> Result<(), SqlDdlBindError> {
1448    if field.generated() {
1449        return Err(SqlDdlBindError::GeneratedFieldNullabilityChangeRejected {
1450            entity_name: entity_name.to_string(),
1451            column_name: field.name().to_string(),
1452        });
1453    }
1454
1455    Ok(())
1456}
1457
1458fn schema_field_default_for_sql_default(
1459    entity_name: &str,
1460    column_name: &str,
1461    default: Option<&crate::value::Value>,
1462    kind: &PersistedFieldKind,
1463    nullable: bool,
1464    storage_decode: FieldStorageDecode,
1465    leaf_codec: LeafCodec,
1466) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1467    let Some(default) = default else {
1468        return Ok(SchemaFieldDefault::None);
1469    };
1470    if matches!(default, crate::value::Value::Null) {
1471        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1472            entity_name: entity_name.to_string(),
1473            column_name: column_name.to_string(),
1474            detail: "NULL cannot be used as an accepted database default".to_string(),
1475        });
1476    }
1477
1478    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
1479        .unwrap_or_else(|| default.clone());
1480    let contract =
1481        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
1482    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1483        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1484            entity_name: entity_name.to_string(),
1485            column_name: column_name.to_string(),
1486            detail: error.to_string(),
1487        },
1488    )?;
1489
1490    Ok(SchemaFieldDefault::SlotPayload(payload))
1491}
1492
1493fn schema_field_default_for_alter_column_default(
1494    entity_name: &str,
1495    field: &PersistedFieldSnapshot,
1496    default: &crate::value::Value,
1497) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1498    if matches!(default, crate::value::Value::Null) {
1499        return Err(SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1500            entity_name: entity_name.to_string(),
1501            column_name: field.name().to_string(),
1502            detail: "NULL cannot be used as an accepted database default".to_string(),
1503        });
1504    }
1505
1506    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(field.kind(), default)
1507        .unwrap_or_else(|| default.clone());
1508    let contract = AcceptedFieldDecodeContract::new(
1509        field.name(),
1510        field.kind(),
1511        field.nullable(),
1512        field.storage_decode(),
1513        field.leaf_codec(),
1514    );
1515    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1516        |error| SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1517            entity_name: entity_name.to_string(),
1518            column_name: field.name().to_string(),
1519            detail: error.to_string(),
1520        },
1521    )?;
1522
1523    Ok(SchemaFieldDefault::SlotPayload(payload))
1524}
1525
1526fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
1527    let snapshot = accepted_before.persisted_snapshot();
1528    let next = snapshot
1529        .fields()
1530        .iter()
1531        .map(|field| field.id().get())
1532        .chain(
1533            snapshot
1534                .row_layout()
1535                .retired_field_slots()
1536                .iter()
1537                .map(|(field_id, _)| field_id.get()),
1538        )
1539        .max()
1540        .unwrap_or(0)
1541        .checked_add(1)
1542        .expect("accepted field IDs should not be exhausted");
1543
1544    FieldId::new(next)
1545}
1546
1547fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
1548    accepted_before
1549        .persisted_snapshot()
1550        .row_layout()
1551        .next_unallocated_slot()
1552}
1553
1554fn persisted_field_contract_for_sql_column_type(
1555    column_type: &str,
1556) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
1557    let normalized = column_type.trim().to_ascii_lowercase();
1558    match normalized.as_str() {
1559        "bool" | "boolean" => Some((
1560            PersistedFieldKind::Bool,
1561            FieldStorageDecode::ByKind,
1562            LeafCodec::Scalar(ScalarCodec::Bool),
1563        )),
1564        "int" | "integer" => Some((
1565            PersistedFieldKind::Int,
1566            FieldStorageDecode::ByKind,
1567            LeafCodec::Scalar(ScalarCodec::Int64),
1568        )),
1569        "nat" | "natural" => Some((
1570            PersistedFieldKind::Nat,
1571            FieldStorageDecode::ByKind,
1572            LeafCodec::Scalar(ScalarCodec::Nat64),
1573        )),
1574        "text" | "string" => Some((
1575            PersistedFieldKind::Text { max_len: None },
1576            FieldStorageDecode::ByKind,
1577            LeafCodec::Scalar(ScalarCodec::Text),
1578        )),
1579        _ => None,
1580    }
1581}
1582
1583#[derive(Clone, Debug, Eq, PartialEq)]
1584pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
1585    FieldPath(BoundSqlDdlFieldPath),
1586    Expression(BoundSqlDdlExpressionKey),
1587}
1588
1589///
1590/// BoundSqlDdlExpressionKey
1591///
1592/// Accepted expression-index key target for SQL DDL binding.
1593///
1594#[derive(Clone, Debug, Eq, PartialEq)]
1595pub(in crate::db) struct BoundSqlDdlExpressionKey {
1596    op: PersistedIndexExpressionOp,
1597    source: BoundSqlDdlFieldPath,
1598    canonical_sql: String,
1599}
1600
1601impl BoundSqlDdlExpressionKey {
1602    /// Return the accepted expression operation.
1603    #[must_use]
1604    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1605        self.op
1606    }
1607
1608    /// Borrow the accepted source field path.
1609    #[must_use]
1610    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1611        &self.source
1612    }
1613
1614    /// Borrow the SQL-facing canonical expression text.
1615    #[must_use]
1616    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1617        self.canonical_sql.as_str()
1618    }
1619}
1620
1621fn bind_create_index_key_item(
1622    key_item: &SqlCreateIndexKeyItem,
1623    entity_name: &str,
1624    schema: &SchemaInfo,
1625) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1626    match key_item {
1627        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1628            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1629                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1630        }
1631        SqlCreateIndexKeyItem::Expression(expression) => {
1632            bind_create_index_expression_key(expression, entity_name, schema)
1633        }
1634    }
1635}
1636
1637fn bind_create_index_expression_key(
1638    expression: &SqlCreateIndexExpressionKey,
1639    entity_name: &str,
1640    schema: &SchemaInfo,
1641) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1642    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1643
1644    Ok(BoundSqlDdlCreateIndexKey::Expression(
1645        BoundSqlDdlExpressionKey {
1646            op: expression_op_from_sql_function(expression.function),
1647            source,
1648            canonical_sql: expression.canonical_sql(),
1649        },
1650    ))
1651}
1652
1653const fn expression_op_from_sql_function(
1654    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1655) -> PersistedIndexExpressionOp {
1656    match function {
1657        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1658            PersistedIndexExpressionOp::Lower
1659        }
1660        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1661            PersistedIndexExpressionOp::Upper
1662        }
1663        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1664            PersistedIndexExpressionOp::Trim
1665        }
1666    }
1667}
1668
1669fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1670    key_items
1671        .iter()
1672        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1673}
1674
1675fn create_index_field_path_report_items(
1676    key_items: &[BoundSqlDdlCreateIndexKey],
1677) -> Vec<BoundSqlDdlFieldPath> {
1678    key_items
1679        .iter()
1680        .map(|key_item| match key_item {
1681            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1682            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1683        })
1684        .collect()
1685}
1686
1687fn bind_create_index_field_path(
1688    field_path: &str,
1689    entity_name: &str,
1690    schema: &SchemaInfo,
1691) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1692    let mut path = field_path
1693        .split('.')
1694        .map(str::trim)
1695        .filter(|segment| !segment.is_empty());
1696    let Some(root) = path.next() else {
1697        return Err(SqlDdlBindError::UnknownFieldPath {
1698            entity_name: entity_name.to_string(),
1699            field_path: field_path.to_string(),
1700        });
1701    };
1702    let segments = path.map(str::to_string).collect::<Vec<_>>();
1703
1704    let capabilities = if segments.is_empty() {
1705        schema.sql_capabilities(root)
1706    } else {
1707        schema.nested_sql_capabilities(root, segments.as_slice())
1708    }
1709    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1710        entity_name: entity_name.to_string(),
1711        field_path: field_path.to_string(),
1712    })?;
1713
1714    if !capabilities.orderable() {
1715        return Err(SqlDdlBindError::FieldPathNotIndexable {
1716            field_path: field_path.to_string(),
1717        });
1718    }
1719
1720    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1721    accepted_path.push(root.to_string());
1722    accepted_path.extend(segments.iter().cloned());
1723
1724    Ok(BoundSqlDdlFieldPath {
1725        root: root.to_string(),
1726        segments,
1727        accepted_path,
1728    })
1729}
1730
1731fn find_field_path_index_by_name<'a>(
1732    schema: &'a SchemaInfo,
1733    index_name: &str,
1734) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1735    schema
1736        .field_path_indexes()
1737        .iter()
1738        .find(|index| index.name() == index_name)
1739}
1740
1741fn existing_field_path_index_matches_request(
1742    index: &crate::db::schema::SchemaIndexInfo,
1743    field_paths: &[BoundSqlDdlFieldPath],
1744    predicate_sql: Option<&str>,
1745    uniqueness: SqlCreateIndexUniqueness,
1746) -> bool {
1747    let fields = index.fields();
1748
1749    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1750        && index.predicate_sql() == predicate_sql
1751        && fields.len() == field_paths.len()
1752        && fields
1753            .iter()
1754            .zip(field_paths)
1755            .all(|(field, requested)| field.path() == requested.accepted_path())
1756}
1757
1758fn find_expression_index_by_name<'a>(
1759    schema: &'a SchemaInfo,
1760    index_name: &str,
1761) -> Option<&'a SchemaExpressionIndexInfo> {
1762    schema
1763        .expression_indexes()
1764        .iter()
1765        .find(|index| index.name() == index_name)
1766}
1767
1768fn existing_expression_index_matches_request(
1769    index: &SchemaExpressionIndexInfo,
1770    key_items: &[BoundSqlDdlCreateIndexKey],
1771    predicate_sql: Option<&str>,
1772    uniqueness: SqlCreateIndexUniqueness,
1773) -> bool {
1774    let existing_key_items = index.key_items();
1775
1776    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1777        && index.predicate_sql() == predicate_sql
1778        && existing_key_items.len() == key_items.len()
1779        && existing_key_items
1780            .iter()
1781            .zip(key_items)
1782            .all(existing_expression_key_item_matches_request)
1783}
1784
1785fn existing_expression_key_item_matches_request(
1786    existing: (
1787        &SchemaExpressionIndexKeyItemInfo,
1788        &BoundSqlDdlCreateIndexKey,
1789    ),
1790) -> bool {
1791    let (existing, requested) = existing;
1792    match (existing, requested) {
1793        (
1794            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1795            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1796        ) => existing.path() == requested.accepted_path(),
1797        (
1798            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1799            BoundSqlDdlCreateIndexKey::Expression(requested),
1800        ) => existing_expression_component_matches_request(
1801            existing.op(),
1802            existing.source().path(),
1803            existing.canonical_text(),
1804            requested,
1805        ),
1806        _ => false,
1807    }
1808}
1809
1810fn existing_expression_component_matches_request(
1811    existing_op: PersistedIndexExpressionOp,
1812    existing_path: &[String],
1813    existing_canonical_text: &str,
1814    requested: &BoundSqlDdlExpressionKey,
1815) -> bool {
1816    let requested_path = requested.source().accepted_path();
1817    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1818
1819    existing_op == requested.op()
1820        && existing_path == requested_path
1821        && existing_canonical_text == requested_canonical_text
1822}
1823
1824fn reject_duplicate_expression_index(
1825    key_items: &[BoundSqlDdlCreateIndexKey],
1826    predicate_sql: Option<&str>,
1827    schema: &SchemaInfo,
1828) -> Result<(), SqlDdlBindError> {
1829    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1830        existing_expression_index_matches_request(
1831            index,
1832            key_items,
1833            predicate_sql,
1834            if index.unique() {
1835                SqlCreateIndexUniqueness::Unique
1836            } else {
1837                SqlCreateIndexUniqueness::NonUnique
1838            },
1839        )
1840    }) else {
1841        return Ok(());
1842    };
1843
1844    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1845        field_path: ddl_key_item_report(key_items).join(","),
1846        existing_index: existing_index.name().to_string(),
1847    })
1848}
1849
1850fn reject_duplicate_field_path_index(
1851    field_paths: &[BoundSqlDdlFieldPath],
1852    predicate_sql: Option<&str>,
1853    schema: &SchemaInfo,
1854) -> Result<(), SqlDdlBindError> {
1855    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1856        let fields = index.fields();
1857        index.predicate_sql() == predicate_sql
1858            && fields.len() == field_paths.len()
1859            && fields
1860                .iter()
1861                .zip(field_paths)
1862                .all(|(field, requested)| field.path() == requested.accepted_path())
1863    }) else {
1864        return Ok(());
1865    };
1866
1867    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1868        field_path: ddl_field_path_report(field_paths).join(","),
1869        existing_index: existing_index.name().to_string(),
1870    })
1871}
1872
1873fn candidate_index_snapshot(
1874    index_name: &str,
1875    key_items: &[BoundSqlDdlCreateIndexKey],
1876    predicate_sql: Option<&str>,
1877    uniqueness: SqlCreateIndexUniqueness,
1878    schema: &SchemaInfo,
1879    index_store_path: &'static str,
1880) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1881    let key = if key_items_are_field_path_only(key_items) {
1882        PersistedIndexKeySnapshot::FieldPath(
1883            key_items
1884                .iter()
1885                .map(|key_item| {
1886                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1887                        unreachable!("field-path-only index checked before field-path lowering");
1888                    };
1889
1890                    accepted_index_field_path_snapshot(schema, field_path)
1891                })
1892                .collect::<Result<Vec<_>, _>>()?,
1893        )
1894    } else {
1895        PersistedIndexKeySnapshot::Items(
1896            key_items
1897                .iter()
1898                .map(|key_item| match key_item {
1899                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1900                        accepted_index_field_path_snapshot(schema, field_path)
1901                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1902                    }
1903                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1904                        accepted_index_expression_snapshot(schema, expression)
1905                    }
1906                })
1907                .collect::<Result<Vec<_>, _>>()?,
1908        )
1909    };
1910
1911    Ok(PersistedIndexSnapshot::new_sql_ddl(
1912        schema.next_secondary_index_ordinal(),
1913        index_name.to_string(),
1914        index_store_path.to_string(),
1915        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1916        key,
1917        predicate_sql.map(str::to_string),
1918    ))
1919}
1920
1921fn accepted_index_field_path_snapshot(
1922    schema: &SchemaInfo,
1923    field_path: &BoundSqlDdlFieldPath,
1924) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1925    schema
1926        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1927        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1928            field_path: field_path.accepted_path().join("."),
1929        })
1930}
1931
1932fn accepted_index_expression_snapshot(
1933    schema: &SchemaInfo,
1934    expression: &BoundSqlDdlExpressionKey,
1935) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1936    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1937    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1938        return Err(SqlDdlBindError::FieldPathNotIndexable {
1939            field_path: expression.source().accepted_path().join("."),
1940        });
1941    };
1942
1943    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1944        PersistedIndexExpressionSnapshot::new(
1945            expression.op(),
1946            source.clone(),
1947            source.kind().clone(),
1948            output_kind,
1949            format!("expr:v1:{}", expression.canonical_sql()),
1950        ),
1951    )))
1952}
1953
1954fn expression_output_kind(
1955    op: PersistedIndexExpressionOp,
1956    source_kind: &PersistedFieldKind,
1957) -> Option<PersistedFieldKind> {
1958    match op {
1959        PersistedIndexExpressionOp::Lower
1960        | PersistedIndexExpressionOp::Upper
1961        | PersistedIndexExpressionOp::Trim
1962        | PersistedIndexExpressionOp::LowerTrim => {
1963            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1964                Some(source_kind.clone())
1965            } else {
1966                None
1967            }
1968        }
1969        PersistedIndexExpressionOp::Date => {
1970            if matches!(
1971                source_kind,
1972                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1973            ) {
1974                Some(PersistedFieldKind::Date)
1975            } else {
1976                None
1977            }
1978        }
1979        PersistedIndexExpressionOp::Year
1980        | PersistedIndexExpressionOp::Month
1981        | PersistedIndexExpressionOp::Day => {
1982            if matches!(
1983                source_kind,
1984                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1985            ) {
1986                Some(PersistedFieldKind::Int)
1987            } else {
1988                None
1989            }
1990        }
1991    }
1992}
1993
1994fn validated_create_index_predicate_sql(
1995    predicate_sql: Option<&str>,
1996    schema: &SchemaInfo,
1997) -> Result<Option<String>, SqlDdlBindError> {
1998    let Some(predicate_sql) = predicate_sql else {
1999        return Ok(None);
2000    };
2001    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
2002        SqlDdlBindError::InvalidFilteredIndexPredicate {
2003            detail: error.to_string(),
2004        }
2005    })?;
2006    validate_predicate(schema, &predicate).map_err(|error| {
2007        SqlDdlBindError::InvalidFilteredIndexPredicate {
2008            detail: error.to_string(),
2009        }
2010    })?;
2011
2012    Ok(Some(predicate_sql.to_string()))
2013}
2014
2015fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
2016    match field_paths {
2017        [field_path] => field_path.accepted_path().to_vec(),
2018        _ => vec![
2019            field_paths
2020                .iter()
2021                .map(|field_path| field_path.accepted_path().join("."))
2022                .collect::<Vec<_>>()
2023                .join(","),
2024        ],
2025    }
2026}
2027
2028fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
2029    match key_items {
2030        [key_item] => vec![ddl_key_item_text(key_item)],
2031        _ => vec![
2032            key_items
2033                .iter()
2034                .map(ddl_key_item_text)
2035                .collect::<Vec<_>>()
2036                .join(","),
2037        ],
2038    }
2039}
2040
2041fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
2042    match key_item {
2043        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
2044        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
2045    }
2046}
2047
2048/// Lower one bound SQL DDL request through schema mutation admission.
2049pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
2050    request: &BoundSqlDdlRequest,
2051) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
2052    match request.statement() {
2053        BoundSqlDdlStatement::AddColumn(add) => {
2054            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
2055        }
2056        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
2057            Ok(admit_sql_ddl_field_default_candidate(alter.field()))
2058        }
2059        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
2060            Ok(admit_sql_ddl_field_nullability_candidate(alter.field()))
2061        }
2062        BoundSqlDdlStatement::DropColumn(drop) => {
2063            Ok(admit_sql_ddl_field_drop_candidate(drop.field()))
2064        }
2065        BoundSqlDdlStatement::RenameColumn(rename) => {
2066            let after = rename
2067                .field()
2068                .clone_with_name(rename.new_name().to_string());
2069            Ok(admit_sql_ddl_field_rename_candidate(rename.field(), &after))
2070        }
2071        BoundSqlDdlStatement::CreateIndex(create) => {
2072            if create.candidate_index().key().is_field_path_only() {
2073                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
2074            } else {
2075                admit_sql_ddl_expression_index_candidate(create.candidate_index())
2076            }
2077        }
2078        BoundSqlDdlStatement::DropIndex(drop) => {
2079            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
2080        }
2081        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
2082    }
2083    .map_err(SqlDdlLoweringError::MutationAdmission)
2084}
2085
2086/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
2087pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
2088    accepted_before: &AcceptedSchemaSnapshot,
2089    request: &BoundSqlDdlRequest,
2090) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
2091    match request.statement() {
2092        BoundSqlDdlStatement::AddColumn(add) => {
2093            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
2094        }
2095        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
2096            derive_sql_ddl_field_default_accepted_after(
2097                accepted_before,
2098                alter.field_name(),
2099                alter.default().clone(),
2100            )
2101        }
2102        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
2103            derive_sql_ddl_field_nullability_accepted_after(
2104                accepted_before,
2105                alter.field_name(),
2106                alter.nullable(),
2107            )
2108        }
2109        BoundSqlDdlStatement::DropColumn(drop) => {
2110            derive_sql_ddl_field_drop_accepted_after(accepted_before, drop.field_name())
2111        }
2112        BoundSqlDdlStatement::RenameColumn(rename) => derive_sql_ddl_field_rename_accepted_after(
2113            accepted_before,
2114            rename.old_name(),
2115            rename.new_name(),
2116        ),
2117        BoundSqlDdlStatement::CreateIndex(create) => {
2118            if create.candidate_index().key().is_field_path_only() {
2119                derive_sql_ddl_field_path_index_accepted_after(
2120                    accepted_before,
2121                    create.candidate_index().clone(),
2122                )
2123            } else {
2124                derive_sql_ddl_expression_index_accepted_after(
2125                    accepted_before,
2126                    create.candidate_index().clone(),
2127                )
2128            }
2129        }
2130        BoundSqlDdlStatement::DropIndex(drop) => {
2131            derive_sql_ddl_secondary_index_drop_accepted_after(
2132                accepted_before,
2133                drop.dropped_index(),
2134            )
2135        }
2136        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
2137    }
2138    .map_err(SqlDdlLoweringError::MutationAdmission)
2139}
2140
2141fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
2142    match bound.statement() {
2143        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
2144            mutation_kind: if add.field().default().is_none() {
2145                SqlDdlMutationKind::AddNullableField
2146            } else {
2147                SqlDdlMutationKind::AddDefaultedField
2148            },
2149            target_index: add.field().name().to_string(),
2150            target_store: add.entity_name().to_string(),
2151            field_path: vec![add.field().name().to_string()],
2152            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2153            rows_scanned: 0,
2154            index_keys_written: 0,
2155        },
2156        BoundSqlDdlStatement::AlterColumnDefault(alter) => SqlDdlPreparationReport {
2157            mutation_kind: alter.mutation_kind(),
2158            target_index: alter.field_name().to_string(),
2159            target_store: alter.entity_name().to_string(),
2160            field_path: vec![alter.field_name().to_string()],
2161            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2162            rows_scanned: 0,
2163            index_keys_written: 0,
2164        },
2165        BoundSqlDdlStatement::AlterColumnNullability(alter) => SqlDdlPreparationReport {
2166            mutation_kind: alter.mutation_kind(),
2167            target_index: alter.field_name().to_string(),
2168            target_store: alter.entity_name().to_string(),
2169            field_path: vec![alter.field_name().to_string()],
2170            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2171            rows_scanned: 0,
2172            index_keys_written: 0,
2173        },
2174        BoundSqlDdlStatement::DropColumn(drop) => SqlDdlPreparationReport {
2175            mutation_kind: SqlDdlMutationKind::DropField,
2176            target_index: drop.field_name().to_string(),
2177            target_store: drop.entity_name().to_string(),
2178            field_path: vec![drop.field_name().to_string()],
2179            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2180            rows_scanned: 0,
2181            index_keys_written: 0,
2182        },
2183        BoundSqlDdlStatement::RenameColumn(rename) => SqlDdlPreparationReport {
2184            mutation_kind: SqlDdlMutationKind::RenameField,
2185            target_index: rename.new_name().to_string(),
2186            target_store: rename.entity_name().to_string(),
2187            field_path: vec![rename.old_name().to_string(), rename.new_name().to_string()],
2188            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2189            rows_scanned: 0,
2190            index_keys_written: 0,
2191        },
2192        BoundSqlDdlStatement::CreateIndex(create) => {
2193            let target = create.candidate_index();
2194
2195            SqlDdlPreparationReport {
2196                mutation_kind: if target.key().is_field_path_only() {
2197                    SqlDdlMutationKind::AddFieldPathIndex
2198                } else {
2199                    SqlDdlMutationKind::AddExpressionIndex
2200                },
2201                target_index: target.name().to_string(),
2202                target_store: target.store().to_string(),
2203                field_path: ddl_key_item_report(create.key_items()),
2204                execution_status: SqlDdlExecutionStatus::PreparedOnly,
2205                rows_scanned: 0,
2206                index_keys_written: 0,
2207            }
2208        }
2209        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
2210            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
2211            target_index: drop.index_name().to_string(),
2212            target_store: drop.dropped_index().store().to_string(),
2213            field_path: drop.field_path().to_vec(),
2214            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2215            rows_scanned: 0,
2216            index_keys_written: 0,
2217        },
2218        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
2219            mutation_kind: no_op.mutation_kind(),
2220            target_index: no_op.index_name().to_string(),
2221            target_store: no_op.target_store().to_string(),
2222            field_path: no_op.field_path().to_vec(),
2223            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2224            rows_scanned: 0,
2225            index_keys_written: 0,
2226        },
2227    }
2228}