Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
25        admit_sql_ddl_field_nullability_candidate, admit_sql_ddl_field_path_index_candidate,
26        admit_sql_ddl_secondary_index_drop_candidate,
27        canonicalize_strict_sql_literal_for_persisted_kind,
28        derive_sql_ddl_expression_index_accepted_after,
29        derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
30        derive_sql_ddl_field_nullability_accepted_after,
31        derive_sql_ddl_field_path_index_accepted_after,
32        derive_sql_ddl_secondary_index_drop_accepted_after,
33        resolve_sql_ddl_secondary_index_drop_candidate,
34    },
35    sql::{
36        identifier::identifiers_tail_match,
37        parser::{
38            SqlAlterColumnAction, SqlAlterTableAddColumnStatement,
39            SqlAlterTableAlterColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
40            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
41            SqlDropIndexStatement, SqlStatement,
42        },
43    },
44};
45use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
46use thiserror::Error as ThisError;
47
48///
49/// PreparedSqlDdlCommand
50///
51/// Fully prepared SQL DDL command. This is intentionally not executable yet:
52/// it packages the accepted-catalog binding, accepted-after derivation, and
53/// schema mutation admission proof for the future execution boundary.
54///
55#[derive(Clone, Debug, Eq, PartialEq)]
56pub(in crate::db) struct PreparedSqlDdlCommand {
57    bound: BoundSqlDdlRequest,
58    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
59    report: SqlDdlPreparationReport,
60}
61
62impl PreparedSqlDdlCommand {
63    /// Borrow the accepted-catalog-bound DDL request.
64    #[must_use]
65    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
66        &self.bound
67    }
68
69    /// Borrow the accepted-after derivation proof.
70    #[must_use]
71    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
72        self.derivation.as_ref()
73    }
74
75    /// Borrow the developer-facing preparation report.
76    #[must_use]
77    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
78        &self.report
79    }
80
81    /// Return whether this prepared command needs schema or storage mutation.
82    #[must_use]
83    pub(in crate::db) const fn mutates_schema(&self) -> bool {
84        self.derivation.is_some()
85    }
86}
87
88///
89/// SqlDdlPreparationReport
90///
91/// Compact report for a DDL command that has passed all pre-execution
92/// frontend and schema-mutation checks.
93///
94#[derive(Clone, Debug, Eq, PartialEq)]
95pub struct SqlDdlPreparationReport {
96    mutation_kind: SqlDdlMutationKind,
97    target_index: String,
98    target_store: String,
99    field_path: Vec<String>,
100    execution_status: SqlDdlExecutionStatus,
101    rows_scanned: usize,
102    index_keys_written: usize,
103}
104
105impl SqlDdlPreparationReport {
106    /// Return the prepared DDL mutation kind.
107    #[must_use]
108    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
109        self.mutation_kind
110    }
111
112    /// Borrow the target accepted index name.
113    #[must_use]
114    pub const fn target_index(&self) -> &str {
115        self.target_index.as_str()
116    }
117
118    /// Borrow the target accepted index store path.
119    #[must_use]
120    pub const fn target_store(&self) -> &str {
121        self.target_store.as_str()
122    }
123
124    /// Borrow the target field path.
125    #[must_use]
126    pub const fn field_path(&self) -> &[String] {
127        self.field_path.as_slice()
128    }
129
130    /// Return the execution status captured by this DDL report.
131    #[must_use]
132    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
133        self.execution_status
134    }
135
136    /// Return rows scanned by DDL execution.
137    #[must_use]
138    pub const fn rows_scanned(&self) -> usize {
139        self.rows_scanned
140    }
141
142    /// Return index keys written by DDL execution.
143    #[must_use]
144    pub const fn index_keys_written(&self) -> usize {
145        self.index_keys_written
146    }
147
148    pub(in crate::db) const fn with_execution_status(
149        mut self,
150        execution_status: SqlDdlExecutionStatus,
151    ) -> Self {
152        self.execution_status = execution_status;
153        self
154    }
155
156    pub(in crate::db) const fn with_execution_metrics(
157        mut self,
158        rows_scanned: usize,
159        index_keys_written: usize,
160    ) -> Self {
161        self.rows_scanned = rows_scanned;
162        self.index_keys_written = index_keys_written;
163        self
164    }
165}
166
167///
168/// SqlDdlMutationKind
169///
170/// Developer-facing SQL DDL mutation kind.
171///
172#[derive(Clone, Copy, Debug, Eq, PartialEq)]
173pub enum SqlDdlMutationKind {
174    AddDefaultedField,
175    AddNullableField,
176    SetFieldDefault,
177    DropFieldDefault,
178    SetFieldNotNull,
179    DropFieldNotNull,
180    AddFieldPathIndex,
181    AddExpressionIndex,
182    DropSecondaryIndex,
183}
184
185impl SqlDdlMutationKind {
186    /// Return the stable diagnostic label for this DDL mutation kind.
187    #[must_use]
188    pub const fn as_str(self) -> &'static str {
189        match self {
190            Self::AddDefaultedField => "add_defaulted_field",
191            Self::AddNullableField => "add_nullable_field",
192            Self::SetFieldDefault => "set_field_default",
193            Self::DropFieldDefault => "drop_field_default",
194            Self::SetFieldNotNull => "set_field_not_null",
195            Self::DropFieldNotNull => "drop_field_not_null",
196            Self::AddFieldPathIndex => "add_field_path_index",
197            Self::AddExpressionIndex => "add_expression_index",
198            Self::DropSecondaryIndex => "drop_secondary_index",
199        }
200    }
201}
202
203///
204/// SqlDdlExecutionStatus
205///
206/// SQL DDL execution state at the current boundary.
207///
208#[derive(Clone, Copy, Debug, Eq, PartialEq)]
209pub enum SqlDdlExecutionStatus {
210    PreparedOnly,
211    Published,
212    NoOp,
213}
214
215impl SqlDdlExecutionStatus {
216    /// Return the stable diagnostic label for this execution status.
217    #[must_use]
218    pub const fn as_str(self) -> &'static str {
219        match self {
220            Self::PreparedOnly => "prepared_only",
221            Self::Published => "published",
222            Self::NoOp => "no_op",
223        }
224    }
225}
226
227///
228/// BoundSqlDdlRequest
229///
230/// Accepted-catalog SQL DDL request after parser syntax has been resolved
231/// against one runtime schema snapshot.
232///
233#[derive(Clone, Debug, Eq, PartialEq)]
234pub(in crate::db) struct BoundSqlDdlRequest {
235    statement: BoundSqlDdlStatement,
236}
237
238impl BoundSqlDdlRequest {
239    /// Borrow the bound statement payload.
240    #[must_use]
241    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
242        &self.statement
243    }
244}
245
246///
247/// BoundSqlDdlStatement
248///
249/// Catalog-resolved DDL statement vocabulary.
250///
251#[derive(Clone, Debug, Eq, PartialEq)]
252pub(in crate::db) enum BoundSqlDdlStatement {
253    AddColumn(BoundSqlAddColumnRequest),
254    AlterColumnDefault(BoundSqlAlterColumnDefaultRequest),
255    AlterColumnNullability(BoundSqlAlterColumnNullabilityRequest),
256    CreateIndex(BoundSqlCreateIndexRequest),
257    DropIndex(BoundSqlDropIndexRequest),
258    NoOp(BoundSqlDdlNoOpRequest),
259}
260
261///
262/// BoundSqlAddColumnRequest
263///
264/// Catalog-resolved additive field DDL request.
265///
266#[derive(Clone, Debug, Eq, PartialEq)]
267pub(in crate::db) struct BoundSqlAddColumnRequest {
268    entity_name: String,
269    field: PersistedFieldSnapshot,
270}
271
272impl BoundSqlAddColumnRequest {
273    /// Borrow the accepted entity name.
274    #[must_use]
275    pub(in crate::db) const fn entity_name(&self) -> &str {
276        self.entity_name.as_str()
277    }
278
279    /// Borrow the accepted DDL-owned field snapshot to publish.
280    #[must_use]
281    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
282        &self.field
283    }
284}
285
286///
287/// BoundSqlAlterColumnDefaultRequest
288///
289/// Catalog-resolved field-default metadata DDL request.
290///
291#[derive(Clone, Debug, Eq, PartialEq)]
292pub(in crate::db) struct BoundSqlAlterColumnDefaultRequest {
293    entity_name: String,
294    field: PersistedFieldSnapshot,
295    default: SchemaFieldDefault,
296    mutation_kind: SqlDdlMutationKind,
297}
298
299impl BoundSqlAlterColumnDefaultRequest {
300    /// Borrow the accepted entity name.
301    #[must_use]
302    pub(in crate::db) const fn entity_name(&self) -> &str {
303        self.entity_name.as_str()
304    }
305
306    /// Borrow the accepted field name.
307    #[must_use]
308    pub(in crate::db) const fn field_name(&self) -> &str {
309        self.field.name()
310    }
311
312    /// Borrow the accepted field whose default will change.
313    #[must_use]
314    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
315        &self.field
316    }
317
318    /// Borrow the default contract to publish.
319    #[must_use]
320    pub(in crate::db) const fn default(&self) -> &SchemaFieldDefault {
321        &self.default
322    }
323
324    /// Return the field-default mutation kind.
325    #[must_use]
326    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
327        self.mutation_kind
328    }
329}
330
331///
332/// BoundSqlAlterColumnNullabilityRequest
333///
334/// Catalog-resolved field-nullability metadata DDL request.
335///
336#[derive(Clone, Debug, Eq, PartialEq)]
337pub(in crate::db) struct BoundSqlAlterColumnNullabilityRequest {
338    entity_name: String,
339    field: PersistedFieldSnapshot,
340    nullable: bool,
341    mutation_kind: SqlDdlMutationKind,
342}
343
344impl BoundSqlAlterColumnNullabilityRequest {
345    /// Borrow the accepted entity name.
346    #[must_use]
347    pub(in crate::db) const fn entity_name(&self) -> &str {
348        self.entity_name.as_str()
349    }
350
351    /// Borrow the accepted field name.
352    #[must_use]
353    pub(in crate::db) const fn field_name(&self) -> &str {
354        self.field.name()
355    }
356
357    /// Borrow the accepted field whose nullability will change.
358    #[must_use]
359    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
360        &self.field
361    }
362
363    /// Return the nullable contract to publish.
364    #[must_use]
365    pub(in crate::db) const fn nullable(&self) -> bool {
366        self.nullable
367    }
368
369    /// Return the field-nullability mutation kind.
370    #[must_use]
371    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
372        self.mutation_kind
373    }
374}
375
376///
377/// BoundSqlDdlNoOpRequest
378///
379/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
380///
381#[derive(Clone, Debug, Eq, PartialEq)]
382pub(in crate::db) struct BoundSqlDdlNoOpRequest {
383    mutation_kind: SqlDdlMutationKind,
384    index_name: String,
385    entity_name: String,
386    target_store: String,
387    field_path: Vec<String>,
388}
389
390impl BoundSqlDdlNoOpRequest {
391    /// Return the user-facing mutation family this no-op belongs to.
392    #[must_use]
393    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
394        self.mutation_kind
395    }
396
397    /// Borrow the requested index name.
398    #[must_use]
399    pub(in crate::db) const fn index_name(&self) -> &str {
400        self.index_name.as_str()
401    }
402
403    /// Borrow the accepted entity name that owns this request.
404    #[must_use]
405    pub(in crate::db) const fn entity_name(&self) -> &str {
406        self.entity_name.as_str()
407    }
408
409    /// Borrow the accepted index store path, or `-` when no target exists.
410    #[must_use]
411    pub(in crate::db) const fn target_store(&self) -> &str {
412        self.target_store.as_str()
413    }
414
415    /// Borrow the target field path, empty when no target exists.
416    #[must_use]
417    pub(in crate::db) const fn field_path(&self) -> &[String] {
418        self.field_path.as_slice()
419    }
420}
421
422///
423/// BoundSqlCreateIndexRequest
424///
425/// Catalog-resolved request for adding one secondary index.
426///
427#[derive(Clone, Debug, Eq, PartialEq)]
428pub(in crate::db) struct BoundSqlCreateIndexRequest {
429    index_name: String,
430    entity_name: String,
431    key_items: Vec<BoundSqlDdlCreateIndexKey>,
432    field_paths: Vec<BoundSqlDdlFieldPath>,
433    candidate_index: PersistedIndexSnapshot,
434}
435
436impl BoundSqlCreateIndexRequest {
437    /// Borrow the requested index name.
438    #[must_use]
439    pub(in crate::db) const fn index_name(&self) -> &str {
440        self.index_name.as_str()
441    }
442
443    /// Borrow the accepted entity name that owns this request.
444    #[must_use]
445    pub(in crate::db) const fn entity_name(&self) -> &str {
446        self.entity_name.as_str()
447    }
448
449    /// Borrow the accepted field-path targets.
450    #[must_use]
451    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
452        self.field_paths.as_slice()
453    }
454
455    /// Borrow the accepted key targets in DDL key order.
456    #[must_use]
457    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
458        self.key_items.as_slice()
459    }
460
461    /// Borrow the candidate accepted index snapshot for mutation admission.
462    #[must_use]
463    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
464        &self.candidate_index
465    }
466}
467
468///
469/// BoundSqlDropIndexRequest
470///
471/// Catalog-resolved request for dropping one DDL-published secondary index.
472///
473#[derive(Clone, Debug, Eq, PartialEq)]
474pub(in crate::db) struct BoundSqlDropIndexRequest {
475    index_name: String,
476    entity_name: String,
477    dropped_index: PersistedIndexSnapshot,
478    field_path: Vec<String>,
479}
480
481impl BoundSqlDropIndexRequest {
482    /// Borrow the requested index name.
483    #[must_use]
484    pub(in crate::db) const fn index_name(&self) -> &str {
485        self.index_name.as_str()
486    }
487
488    /// Borrow the accepted entity name that owns this request.
489    #[must_use]
490    pub(in crate::db) const fn entity_name(&self) -> &str {
491        self.entity_name.as_str()
492    }
493
494    /// Borrow the accepted index snapshot that will be removed.
495    #[must_use]
496    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
497        &self.dropped_index
498    }
499
500    /// Borrow the dropped field-path target.
501    #[must_use]
502    pub(in crate::db) const fn field_path(&self) -> &[String] {
503        self.field_path.as_slice()
504    }
505}
506
507///
508/// BoundSqlDdlFieldPath
509///
510/// Accepted field-path target for SQL DDL binding.
511///
512#[derive(Clone, Debug, Eq, PartialEq)]
513pub(in crate::db) struct BoundSqlDdlFieldPath {
514    root: String,
515    segments: Vec<String>,
516    accepted_path: Vec<String>,
517}
518
519impl BoundSqlDdlFieldPath {
520    /// Borrow the top-level field name.
521    #[must_use]
522    pub(in crate::db) const fn root(&self) -> &str {
523        self.root.as_str()
524    }
525
526    /// Borrow nested path segments below the top-level field.
527    #[must_use]
528    pub(in crate::db) const fn segments(&self) -> &[String] {
529        self.segments.as_slice()
530    }
531
532    /// Borrow the full accepted field path used by index metadata.
533    #[must_use]
534    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
535        self.accepted_path.as_slice()
536    }
537}
538
539///
540/// SqlDdlBindError
541///
542/// Typed fail-closed reasons for SQL DDL catalog binding.
543///
544#[derive(Debug, Eq, PartialEq, ThisError)]
545pub(in crate::db) enum SqlDdlBindError {
546    #[error("SQL DDL binder requires a DDL statement")]
547    NotDdl,
548
549    #[error("accepted schema does not expose an entity name")]
550    MissingEntityName,
551
552    #[error("accepted schema does not expose an entity path")]
553    MissingEntityPath,
554
555    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
556    EntityMismatch {
557        sql_entity: String,
558        expected_entity: String,
559    },
560
561    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
562    UnknownFieldPath {
563        entity_name: String,
564        field_path: String,
565    },
566
567    #[error("field path '{field_path}' is not indexable")]
568    FieldPathNotIndexable { field_path: String },
569
570    #[error("field path '{field_path}' depends on generated-only metadata")]
571    FieldPathNotAcceptedCatalogBacked { field_path: String },
572
573    #[error("invalid filtered index predicate: {detail}")]
574    InvalidFilteredIndexPredicate { detail: String },
575
576    #[error("index name '{index_name}' already exists in the accepted schema")]
577    DuplicateIndexName { index_name: String },
578
579    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
580    DuplicateFieldPathIndex {
581        field_path: String,
582        existing_index: String,
583    },
584
585    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
586    UnknownIndex {
587        entity_name: String,
588        index_name: String,
589    },
590
591    #[error(
592        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
593    )]
594    GeneratedIndexDropRejected { index_name: String },
595
596    #[error(
597        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
598    )]
599    UnsupportedDropIndex { index_name: String },
600
601    #[error(
602        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
603    )]
604    UnsupportedAlterTableAddColumn {
605        entity_name: String,
606        column_name: String,
607    },
608
609    #[error(
610        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
611    )]
612    InvalidAlterTableAddColumnDefault {
613        entity_name: String,
614        column_name: String,
615        detail: String,
616    },
617
618    #[error(
619        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
620    )]
621    UnsupportedAlterTableAddColumnNotNull {
622        entity_name: String,
623        column_name: String,
624    },
625
626    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
627    DuplicateColumn {
628        entity_name: String,
629        column_name: String,
630    },
631
632    #[error(
633        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
634    )]
635    UnsupportedAlterTableAddColumnType {
636        entity_name: String,
637        column_name: String,
638        column_type: String,
639    },
640
641    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
642    UnknownColumn {
643        entity_name: String,
644        column_name: String,
645    },
646
647    #[error(
648        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
649    )]
650    UnsupportedAlterTableAlterColumn {
651        entity_name: String,
652        column_name: String,
653        action: String,
654    },
655
656    #[error(
657        "SQL DDL ALTER TABLE ALTER COLUMN SET DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
658    )]
659    InvalidAlterTableAlterColumnDefault {
660        entity_name: String,
661        column_name: String,
662        detail: String,
663    },
664
665    #[error(
666        "SQL DDL ALTER TABLE ALTER COLUMN DROP DEFAULT is not executable yet for required accepted entity '{entity_name}' column '{column_name}'"
667    )]
668    UnsupportedAlterTableDropDefaultRequired {
669        entity_name: String,
670        column_name: String,
671    },
672
673    #[error(
674        "SQL DDL ALTER TABLE ALTER COLUMN DEFAULT cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema default instead"
675    )]
676    GeneratedFieldDefaultChangeRejected {
677        entity_name: String,
678        column_name: String,
679    },
680
681    #[error(
682        "SQL DDL ALTER TABLE ALTER COLUMN NULLABILITY cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema nullability instead"
683    )]
684    GeneratedFieldNullabilityChangeRejected {
685        entity_name: String,
686        column_name: String,
687    },
688}
689
690///
691/// SqlDdlLoweringError
692///
693/// Typed fail-closed reasons while lowering bound DDL into schema mutation
694/// admission.
695///
696#[derive(Debug, Eq, PartialEq, ThisError)]
697pub(in crate::db) enum SqlDdlLoweringError {
698    #[error("SQL DDL lowering requires a supported DDL statement")]
699    UnsupportedStatement,
700
701    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
702    MutationAdmission(SchemaDdlMutationAdmissionError),
703}
704
705///
706/// SqlDdlPrepareError
707///
708/// Typed fail-closed preparation errors for SQL DDL.
709///
710#[derive(Debug, Eq, PartialEq, ThisError)]
711pub(in crate::db) enum SqlDdlPrepareError {
712    #[error("{0}")]
713    Bind(#[from] SqlDdlBindError),
714
715    #[error("{0}")]
716    Lowering(#[from] SqlDdlLoweringError),
717}
718
719/// Prepare one parsed SQL DDL statement through every pre-execution proof.
720pub(in crate::db) fn prepare_sql_ddl_statement(
721    statement: &SqlStatement,
722    accepted_before: &AcceptedSchemaSnapshot,
723    schema: &SchemaInfo,
724    index_store_path: &'static str,
725) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
726    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
727    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
728        None
729    } else {
730        Some(derive_bound_sql_ddl_accepted_after(
731            accepted_before,
732            &bound,
733        )?)
734    };
735    let report = ddl_preparation_report(&bound);
736
737    Ok(PreparedSqlDdlCommand {
738        bound,
739        derivation,
740        report,
741    })
742}
743
744/// Bind one parsed SQL DDL statement against accepted catalog metadata.
745pub(in crate::db) fn bind_sql_ddl_statement(
746    statement: &SqlStatement,
747    accepted_before: &AcceptedSchemaSnapshot,
748    schema: &SchemaInfo,
749    index_store_path: &'static str,
750) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
751    let SqlStatement::Ddl(ddl) = statement else {
752        return Err(SqlDdlBindError::NotDdl);
753    };
754
755    match ddl {
756        SqlDdlStatement::CreateIndex(statement) => {
757            bind_create_index_statement(statement, schema, index_store_path)
758        }
759        SqlDdlStatement::DropIndex(statement) => {
760            bind_drop_index_statement(statement, accepted_before, schema)
761        }
762        SqlDdlStatement::AlterTableAddColumn(statement) => {
763            bind_alter_table_add_column_statement(statement, accepted_before, schema)
764        }
765        SqlDdlStatement::AlterTableAlterColumn(statement) => {
766            bind_alter_table_alter_column_statement(statement, accepted_before, schema)
767        }
768    }
769}
770
771fn bind_create_index_statement(
772    statement: &SqlCreateIndexStatement,
773    schema: &SchemaInfo,
774    index_store_path: &'static str,
775) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
776    let entity_name = schema
777        .entity_name()
778        .ok_or(SqlDdlBindError::MissingEntityName)?;
779
780    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
781        return Err(SqlDdlBindError::EntityMismatch {
782            sql_entity: statement.entity.clone(),
783            expected_entity: entity_name.to_string(),
784        });
785    }
786
787    let key_items = statement
788        .key_items
789        .iter()
790        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
791        .collect::<Result<Vec<_>, _>>()?;
792    let field_paths = create_index_field_path_report_items(key_items.as_slice());
793    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
794        if key_items_are_field_path_only(key_items.as_slice())
795            && statement.if_not_exists
796            && existing_field_path_index_matches_request(
797                existing_index,
798                field_paths.as_slice(),
799                statement.predicate_sql.as_deref(),
800                statement.uniqueness,
801            )
802        {
803            return Ok(BoundSqlDdlRequest {
804                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
805                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
806                    index_name: statement.name.clone(),
807                    entity_name: entity_name.to_string(),
808                    target_store: existing_index.store().to_string(),
809                    field_path: ddl_field_path_report(field_paths.as_slice()),
810                }),
811            });
812        }
813
814        return Err(SqlDdlBindError::DuplicateIndexName {
815            index_name: statement.name.clone(),
816        });
817    }
818    let predicate_sql =
819        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
820    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
821        if statement.if_not_exists
822            && existing_expression_index_matches_request(
823                existing_index,
824                key_items.as_slice(),
825                predicate_sql.as_deref(),
826                statement.uniqueness,
827            )
828        {
829            return Ok(BoundSqlDdlRequest {
830                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
831                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
832                    index_name: statement.name.clone(),
833                    entity_name: entity_name.to_string(),
834                    target_store: existing_index.store().to_string(),
835                    field_path: ddl_key_item_report(key_items.as_slice()),
836                }),
837            });
838        }
839
840        return Err(SqlDdlBindError::DuplicateIndexName {
841            index_name: statement.name.clone(),
842        });
843    }
844    if key_items_are_field_path_only(key_items.as_slice()) {
845        reject_duplicate_field_path_index(
846            field_paths.as_slice(),
847            predicate_sql.as_deref(),
848            schema,
849        )?;
850    } else {
851        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
852    }
853    let candidate_index = candidate_index_snapshot(
854        statement.name.as_str(),
855        key_items.as_slice(),
856        predicate_sql.as_deref(),
857        statement.uniqueness,
858        schema,
859        index_store_path,
860    )?;
861
862    Ok(BoundSqlDdlRequest {
863        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
864            index_name: statement.name.clone(),
865            entity_name: entity_name.to_string(),
866            key_items,
867            field_paths,
868            candidate_index,
869        }),
870    })
871}
872
873fn bind_drop_index_statement(
874    statement: &SqlDropIndexStatement,
875    accepted_before: &AcceptedSchemaSnapshot,
876    schema: &SchemaInfo,
877) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
878    let entity_name = schema
879        .entity_name()
880        .ok_or(SqlDdlBindError::MissingEntityName)?;
881
882    if let Some(sql_entity) = statement.entity.as_deref()
883        && !identifiers_tail_match(sql_entity, entity_name)
884    {
885        return Err(SqlDdlBindError::EntityMismatch {
886            sql_entity: sql_entity.to_string(),
887            expected_entity: entity_name.to_string(),
888        });
889    }
890    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
891        accepted_before,
892        &statement.name,
893    )
894    .map_err(|error| match error {
895        SchemaDdlIndexDropCandidateError::Generated => {
896            SqlDdlBindError::GeneratedIndexDropRejected {
897                index_name: statement.name.clone(),
898            }
899        }
900        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
901            entity_name: entity_name.to_string(),
902            index_name: statement.name.clone(),
903        },
904        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
905            index_name: statement.name.clone(),
906        },
907    });
908    let (dropped_index, field_path) = match drop_candidate {
909        Ok((dropped_index, field_path)) => (dropped_index, field_path),
910        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
911            return Ok(BoundSqlDdlRequest {
912                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
913                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
914                    index_name: statement.name.clone(),
915                    entity_name: entity_name.to_string(),
916                    target_store: "-".to_string(),
917                    field_path: Vec::new(),
918                }),
919            });
920        }
921        Err(error) => return Err(error),
922    };
923    Ok(BoundSqlDdlRequest {
924        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
925            index_name: statement.name.clone(),
926            entity_name: entity_name.to_string(),
927            dropped_index,
928            field_path,
929        }),
930    })
931}
932
933fn bind_alter_table_add_column_statement(
934    statement: &SqlAlterTableAddColumnStatement,
935    accepted_before: &AcceptedSchemaSnapshot,
936    schema: &SchemaInfo,
937) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
938    let entity_name = schema
939        .entity_name()
940        .ok_or(SqlDdlBindError::MissingEntityName)?;
941
942    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
943        return Err(SqlDdlBindError::EntityMismatch {
944            sql_entity: statement.entity.clone(),
945            expected_entity: entity_name.to_string(),
946        });
947    }
948
949    if schema
950        .field_nullable(statement.column_name.as_str())
951        .is_some()
952    {
953        return Err(SqlDdlBindError::DuplicateColumn {
954            entity_name: entity_name.to_string(),
955            column_name: statement.column_name.clone(),
956        });
957    }
958
959    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
960        statement.column_type.as_str(),
961    )
962    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
963        entity_name: entity_name.to_string(),
964        column_name: statement.column_name.clone(),
965        column_type: statement.column_type.clone(),
966    })?;
967    let default = schema_field_default_for_sql_default(
968        entity_name,
969        statement.column_name.as_str(),
970        statement.default.as_ref(),
971        &kind,
972        statement.nullable,
973        storage_decode,
974        leaf_codec,
975    )?;
976    if !statement.nullable && default.is_none() {
977        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
978            entity_name: entity_name.to_string(),
979            column_name: statement.column_name.clone(),
980        });
981    }
982    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
983        next_sql_ddl_field_id(accepted_before),
984        statement.column_name.clone(),
985        next_sql_ddl_field_slot(accepted_before),
986        kind,
987        Vec::new(),
988        statement.nullable,
989        default,
990        SchemaFieldWritePolicy::from_model_policies(None, None),
991        PersistedFieldOrigin::SqlDdl,
992        storage_decode,
993        leaf_codec,
994    );
995
996    Ok(BoundSqlDdlRequest {
997        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
998            entity_name: entity_name.to_string(),
999            field,
1000        }),
1001    })
1002}
1003
1004fn alter_table_alter_column_bind_error(
1005    statement: &SqlAlterTableAlterColumnStatement,
1006    schema: &SchemaInfo,
1007) -> SqlDdlBindError {
1008    let Some(entity_name) = schema.entity_name() else {
1009        return SqlDdlBindError::MissingEntityName;
1010    };
1011
1012    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1013        return SqlDdlBindError::EntityMismatch {
1014            sql_entity: statement.entity.clone(),
1015            expected_entity: entity_name.to_string(),
1016        };
1017    }
1018
1019    if schema
1020        .field_nullable(statement.column_name.as_str())
1021        .is_none()
1022    {
1023        return SqlDdlBindError::UnknownColumn {
1024            entity_name: entity_name.to_string(),
1025            column_name: statement.column_name.clone(),
1026        };
1027    }
1028
1029    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
1030        entity_name: entity_name.to_string(),
1031        column_name: statement.column_name.clone(),
1032        action: statement.action.label().to_string(),
1033    }
1034}
1035
1036fn bind_alter_table_alter_column_statement(
1037    statement: &SqlAlterTableAlterColumnStatement,
1038    accepted_before: &AcceptedSchemaSnapshot,
1039    schema: &SchemaInfo,
1040) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1041    let Some(entity_name) = schema.entity_name() else {
1042        return Err(SqlDdlBindError::MissingEntityName);
1043    };
1044
1045    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1046        return Err(SqlDdlBindError::EntityMismatch {
1047            sql_entity: statement.entity.clone(),
1048            expected_entity: entity_name.to_string(),
1049        });
1050    }
1051
1052    let field = accepted_before
1053        .persisted_snapshot()
1054        .fields()
1055        .iter()
1056        .find(|field| field.name() == statement.column_name)
1057        .ok_or_else(|| SqlDdlBindError::UnknownColumn {
1058            entity_name: entity_name.to_string(),
1059            column_name: statement.column_name.clone(),
1060        })?;
1061
1062    match &statement.action {
1063        SqlAlterColumnAction::SetDefault(default) => {
1064            reject_generated_field_default_change(entity_name, field)?;
1065            let default =
1066                schema_field_default_for_alter_column_default(entity_name, field, default)?;
1067            Ok(bind_alter_table_alter_column_default(
1068                entity_name,
1069                field,
1070                default,
1071                SqlDdlMutationKind::SetFieldDefault,
1072            ))
1073        }
1074        SqlAlterColumnAction::DropDefault => {
1075            if !field.default().is_none() {
1076                reject_generated_field_default_change(entity_name, field)?;
1077            }
1078            if !field.nullable() && !field.default().is_none() {
1079                return Err(SqlDdlBindError::UnsupportedAlterTableDropDefaultRequired {
1080                    entity_name: entity_name.to_string(),
1081                    column_name: statement.column_name.clone(),
1082                });
1083            }
1084            Ok(bind_alter_table_alter_column_default(
1085                entity_name,
1086                field,
1087                SchemaFieldDefault::None,
1088                SqlDdlMutationKind::DropFieldDefault,
1089            ))
1090        }
1091        SqlAlterColumnAction::SetNotNull => Ok(bind_alter_table_alter_column_nullability(
1092            entity_name,
1093            field,
1094            false,
1095            SqlDdlMutationKind::SetFieldNotNull,
1096        )?),
1097        SqlAlterColumnAction::DropNotNull => Ok(bind_alter_table_alter_column_nullability(
1098            entity_name,
1099            field,
1100            true,
1101            SqlDdlMutationKind::DropFieldNotNull,
1102        )?),
1103    }
1104}
1105
1106fn bind_alter_table_alter_column_default(
1107    entity_name: &str,
1108    field: &PersistedFieldSnapshot,
1109    default: SchemaFieldDefault,
1110    mutation_kind: SqlDdlMutationKind,
1111) -> BoundSqlDdlRequest {
1112    if field.default() == &default {
1113        return BoundSqlDdlRequest {
1114            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1115                mutation_kind,
1116                index_name: field.name().to_string(),
1117                entity_name: entity_name.to_string(),
1118                target_store: entity_name.to_string(),
1119                field_path: vec![field.name().to_string()],
1120            }),
1121        };
1122    }
1123
1124    BoundSqlDdlRequest {
1125        statement: BoundSqlDdlStatement::AlterColumnDefault(BoundSqlAlterColumnDefaultRequest {
1126            entity_name: entity_name.to_string(),
1127            field: field.clone(),
1128            default,
1129            mutation_kind,
1130        }),
1131    }
1132}
1133
1134fn reject_generated_field_default_change(
1135    entity_name: &str,
1136    field: &PersistedFieldSnapshot,
1137) -> Result<(), SqlDdlBindError> {
1138    if field.generated() {
1139        return Err(SqlDdlBindError::GeneratedFieldDefaultChangeRejected {
1140            entity_name: entity_name.to_string(),
1141            column_name: field.name().to_string(),
1142        });
1143    }
1144
1145    Ok(())
1146}
1147
1148fn bind_alter_table_alter_column_nullability(
1149    entity_name: &str,
1150    field: &PersistedFieldSnapshot,
1151    nullable: bool,
1152    mutation_kind: SqlDdlMutationKind,
1153) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1154    if field.nullable() == nullable {
1155        return Ok(BoundSqlDdlRequest {
1156            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1157                mutation_kind,
1158                index_name: field.name().to_string(),
1159                entity_name: entity_name.to_string(),
1160                target_store: entity_name.to_string(),
1161                field_path: vec![field.name().to_string()],
1162            }),
1163        });
1164    }
1165
1166    reject_generated_field_nullability_change(entity_name, field)?;
1167
1168    Ok(BoundSqlDdlRequest {
1169        statement: BoundSqlDdlStatement::AlterColumnNullability(
1170            BoundSqlAlterColumnNullabilityRequest {
1171                entity_name: entity_name.to_string(),
1172                field: field.clone(),
1173                nullable,
1174                mutation_kind,
1175            },
1176        ),
1177    })
1178}
1179
1180fn reject_generated_field_nullability_change(
1181    entity_name: &str,
1182    field: &PersistedFieldSnapshot,
1183) -> Result<(), SqlDdlBindError> {
1184    if field.generated() {
1185        return Err(SqlDdlBindError::GeneratedFieldNullabilityChangeRejected {
1186            entity_name: entity_name.to_string(),
1187            column_name: field.name().to_string(),
1188        });
1189    }
1190
1191    Ok(())
1192}
1193
1194fn schema_field_default_for_sql_default(
1195    entity_name: &str,
1196    column_name: &str,
1197    default: Option<&crate::value::Value>,
1198    kind: &PersistedFieldKind,
1199    nullable: bool,
1200    storage_decode: FieldStorageDecode,
1201    leaf_codec: LeafCodec,
1202) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1203    let Some(default) = default else {
1204        return Ok(SchemaFieldDefault::None);
1205    };
1206    if matches!(default, crate::value::Value::Null) {
1207        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1208            entity_name: entity_name.to_string(),
1209            column_name: column_name.to_string(),
1210            detail: "NULL cannot be used as an accepted database default".to_string(),
1211        });
1212    }
1213
1214    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
1215        .unwrap_or_else(|| default.clone());
1216    let contract =
1217        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
1218    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1219        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1220            entity_name: entity_name.to_string(),
1221            column_name: column_name.to_string(),
1222            detail: error.to_string(),
1223        },
1224    )?;
1225
1226    Ok(SchemaFieldDefault::SlotPayload(payload))
1227}
1228
1229fn schema_field_default_for_alter_column_default(
1230    entity_name: &str,
1231    field: &PersistedFieldSnapshot,
1232    default: &crate::value::Value,
1233) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1234    if matches!(default, crate::value::Value::Null) {
1235        return Err(SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1236            entity_name: entity_name.to_string(),
1237            column_name: field.name().to_string(),
1238            detail: "NULL cannot be used as an accepted database default".to_string(),
1239        });
1240    }
1241
1242    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(field.kind(), default)
1243        .unwrap_or_else(|| default.clone());
1244    let contract = AcceptedFieldDecodeContract::new(
1245        field.name(),
1246        field.kind(),
1247        field.nullable(),
1248        field.storage_decode(),
1249        field.leaf_codec(),
1250    );
1251    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1252        |error| SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1253            entity_name: entity_name.to_string(),
1254            column_name: field.name().to_string(),
1255            detail: error.to_string(),
1256        },
1257    )?;
1258
1259    Ok(SchemaFieldDefault::SlotPayload(payload))
1260}
1261
1262fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
1263    let next = accepted_before
1264        .persisted_snapshot()
1265        .fields()
1266        .iter()
1267        .map(|field| field.id().get())
1268        .max()
1269        .unwrap_or(0)
1270        .checked_add(1)
1271        .expect("accepted field IDs should not be exhausted");
1272
1273    FieldId::new(next)
1274}
1275
1276fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
1277    let next = accepted_before
1278        .persisted_snapshot()
1279        .row_layout()
1280        .field_to_slot()
1281        .iter()
1282        .map(|(_, slot)| slot.get())
1283        .max()
1284        .unwrap_or(0)
1285        .checked_add(1)
1286        .expect("accepted row slots should not be exhausted");
1287
1288    SchemaFieldSlot::new(next)
1289}
1290
1291fn persisted_field_contract_for_sql_column_type(
1292    column_type: &str,
1293) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
1294    let normalized = column_type.trim().to_ascii_lowercase();
1295    match normalized.as_str() {
1296        "bool" | "boolean" => Some((
1297            PersistedFieldKind::Bool,
1298            FieldStorageDecode::ByKind,
1299            LeafCodec::Scalar(ScalarCodec::Bool),
1300        )),
1301        "int" | "integer" => Some((
1302            PersistedFieldKind::Int,
1303            FieldStorageDecode::ByKind,
1304            LeafCodec::Scalar(ScalarCodec::Int64),
1305        )),
1306        "nat" | "natural" => Some((
1307            PersistedFieldKind::Nat,
1308            FieldStorageDecode::ByKind,
1309            LeafCodec::Scalar(ScalarCodec::Nat64),
1310        )),
1311        "text" | "string" => Some((
1312            PersistedFieldKind::Text { max_len: None },
1313            FieldStorageDecode::ByKind,
1314            LeafCodec::Scalar(ScalarCodec::Text),
1315        )),
1316        _ => None,
1317    }
1318}
1319
1320#[derive(Clone, Debug, Eq, PartialEq)]
1321pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
1322    FieldPath(BoundSqlDdlFieldPath),
1323    Expression(BoundSqlDdlExpressionKey),
1324}
1325
1326///
1327/// BoundSqlDdlExpressionKey
1328///
1329/// Accepted expression-index key target for SQL DDL binding.
1330///
1331#[derive(Clone, Debug, Eq, PartialEq)]
1332pub(in crate::db) struct BoundSqlDdlExpressionKey {
1333    op: PersistedIndexExpressionOp,
1334    source: BoundSqlDdlFieldPath,
1335    canonical_sql: String,
1336}
1337
1338impl BoundSqlDdlExpressionKey {
1339    /// Return the accepted expression operation.
1340    #[must_use]
1341    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1342        self.op
1343    }
1344
1345    /// Borrow the accepted source field path.
1346    #[must_use]
1347    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1348        &self.source
1349    }
1350
1351    /// Borrow the SQL-facing canonical expression text.
1352    #[must_use]
1353    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1354        self.canonical_sql.as_str()
1355    }
1356}
1357
1358fn bind_create_index_key_item(
1359    key_item: &SqlCreateIndexKeyItem,
1360    entity_name: &str,
1361    schema: &SchemaInfo,
1362) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1363    match key_item {
1364        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1365            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1366                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1367        }
1368        SqlCreateIndexKeyItem::Expression(expression) => {
1369            bind_create_index_expression_key(expression, entity_name, schema)
1370        }
1371    }
1372}
1373
1374fn bind_create_index_expression_key(
1375    expression: &SqlCreateIndexExpressionKey,
1376    entity_name: &str,
1377    schema: &SchemaInfo,
1378) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1379    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1380
1381    Ok(BoundSqlDdlCreateIndexKey::Expression(
1382        BoundSqlDdlExpressionKey {
1383            op: expression_op_from_sql_function(expression.function),
1384            source,
1385            canonical_sql: expression.canonical_sql(),
1386        },
1387    ))
1388}
1389
1390const fn expression_op_from_sql_function(
1391    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1392) -> PersistedIndexExpressionOp {
1393    match function {
1394        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1395            PersistedIndexExpressionOp::Lower
1396        }
1397        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1398            PersistedIndexExpressionOp::Upper
1399        }
1400        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1401            PersistedIndexExpressionOp::Trim
1402        }
1403    }
1404}
1405
1406fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1407    key_items
1408        .iter()
1409        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1410}
1411
1412fn create_index_field_path_report_items(
1413    key_items: &[BoundSqlDdlCreateIndexKey],
1414) -> Vec<BoundSqlDdlFieldPath> {
1415    key_items
1416        .iter()
1417        .map(|key_item| match key_item {
1418            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1419            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1420        })
1421        .collect()
1422}
1423
1424fn bind_create_index_field_path(
1425    field_path: &str,
1426    entity_name: &str,
1427    schema: &SchemaInfo,
1428) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1429    let mut path = field_path
1430        .split('.')
1431        .map(str::trim)
1432        .filter(|segment| !segment.is_empty());
1433    let Some(root) = path.next() else {
1434        return Err(SqlDdlBindError::UnknownFieldPath {
1435            entity_name: entity_name.to_string(),
1436            field_path: field_path.to_string(),
1437        });
1438    };
1439    let segments = path.map(str::to_string).collect::<Vec<_>>();
1440
1441    let capabilities = if segments.is_empty() {
1442        schema.sql_capabilities(root)
1443    } else {
1444        schema.nested_sql_capabilities(root, segments.as_slice())
1445    }
1446    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1447        entity_name: entity_name.to_string(),
1448        field_path: field_path.to_string(),
1449    })?;
1450
1451    if !capabilities.orderable() {
1452        return Err(SqlDdlBindError::FieldPathNotIndexable {
1453            field_path: field_path.to_string(),
1454        });
1455    }
1456
1457    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1458    accepted_path.push(root.to_string());
1459    accepted_path.extend(segments.iter().cloned());
1460
1461    Ok(BoundSqlDdlFieldPath {
1462        root: root.to_string(),
1463        segments,
1464        accepted_path,
1465    })
1466}
1467
1468fn find_field_path_index_by_name<'a>(
1469    schema: &'a SchemaInfo,
1470    index_name: &str,
1471) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1472    schema
1473        .field_path_indexes()
1474        .iter()
1475        .find(|index| index.name() == index_name)
1476}
1477
1478fn existing_field_path_index_matches_request(
1479    index: &crate::db::schema::SchemaIndexInfo,
1480    field_paths: &[BoundSqlDdlFieldPath],
1481    predicate_sql: Option<&str>,
1482    uniqueness: SqlCreateIndexUniqueness,
1483) -> bool {
1484    let fields = index.fields();
1485
1486    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1487        && index.predicate_sql() == predicate_sql
1488        && fields.len() == field_paths.len()
1489        && fields
1490            .iter()
1491            .zip(field_paths)
1492            .all(|(field, requested)| field.path() == requested.accepted_path())
1493}
1494
1495fn find_expression_index_by_name<'a>(
1496    schema: &'a SchemaInfo,
1497    index_name: &str,
1498) -> Option<&'a SchemaExpressionIndexInfo> {
1499    schema
1500        .expression_indexes()
1501        .iter()
1502        .find(|index| index.name() == index_name)
1503}
1504
1505fn existing_expression_index_matches_request(
1506    index: &SchemaExpressionIndexInfo,
1507    key_items: &[BoundSqlDdlCreateIndexKey],
1508    predicate_sql: Option<&str>,
1509    uniqueness: SqlCreateIndexUniqueness,
1510) -> bool {
1511    let existing_key_items = index.key_items();
1512
1513    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1514        && index.predicate_sql() == predicate_sql
1515        && existing_key_items.len() == key_items.len()
1516        && existing_key_items
1517            .iter()
1518            .zip(key_items)
1519            .all(existing_expression_key_item_matches_request)
1520}
1521
1522fn existing_expression_key_item_matches_request(
1523    existing: (
1524        &SchemaExpressionIndexKeyItemInfo,
1525        &BoundSqlDdlCreateIndexKey,
1526    ),
1527) -> bool {
1528    let (existing, requested) = existing;
1529    match (existing, requested) {
1530        (
1531            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1532            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1533        ) => existing.path() == requested.accepted_path(),
1534        (
1535            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1536            BoundSqlDdlCreateIndexKey::Expression(requested),
1537        ) => existing_expression_component_matches_request(
1538            existing.op(),
1539            existing.source().path(),
1540            existing.canonical_text(),
1541            requested,
1542        ),
1543        _ => false,
1544    }
1545}
1546
1547fn existing_expression_component_matches_request(
1548    existing_op: PersistedIndexExpressionOp,
1549    existing_path: &[String],
1550    existing_canonical_text: &str,
1551    requested: &BoundSqlDdlExpressionKey,
1552) -> bool {
1553    let requested_path = requested.source().accepted_path();
1554    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1555
1556    existing_op == requested.op()
1557        && existing_path == requested_path
1558        && existing_canonical_text == requested_canonical_text
1559}
1560
1561fn reject_duplicate_expression_index(
1562    key_items: &[BoundSqlDdlCreateIndexKey],
1563    predicate_sql: Option<&str>,
1564    schema: &SchemaInfo,
1565) -> Result<(), SqlDdlBindError> {
1566    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1567        existing_expression_index_matches_request(
1568            index,
1569            key_items,
1570            predicate_sql,
1571            if index.unique() {
1572                SqlCreateIndexUniqueness::Unique
1573            } else {
1574                SqlCreateIndexUniqueness::NonUnique
1575            },
1576        )
1577    }) else {
1578        return Ok(());
1579    };
1580
1581    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1582        field_path: ddl_key_item_report(key_items).join(","),
1583        existing_index: existing_index.name().to_string(),
1584    })
1585}
1586
1587fn reject_duplicate_field_path_index(
1588    field_paths: &[BoundSqlDdlFieldPath],
1589    predicate_sql: Option<&str>,
1590    schema: &SchemaInfo,
1591) -> Result<(), SqlDdlBindError> {
1592    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1593        let fields = index.fields();
1594        index.predicate_sql() == predicate_sql
1595            && fields.len() == field_paths.len()
1596            && fields
1597                .iter()
1598                .zip(field_paths)
1599                .all(|(field, requested)| field.path() == requested.accepted_path())
1600    }) else {
1601        return Ok(());
1602    };
1603
1604    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1605        field_path: ddl_field_path_report(field_paths).join(","),
1606        existing_index: existing_index.name().to_string(),
1607    })
1608}
1609
1610fn candidate_index_snapshot(
1611    index_name: &str,
1612    key_items: &[BoundSqlDdlCreateIndexKey],
1613    predicate_sql: Option<&str>,
1614    uniqueness: SqlCreateIndexUniqueness,
1615    schema: &SchemaInfo,
1616    index_store_path: &'static str,
1617) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1618    let key = if key_items_are_field_path_only(key_items) {
1619        PersistedIndexKeySnapshot::FieldPath(
1620            key_items
1621                .iter()
1622                .map(|key_item| {
1623                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1624                        unreachable!("field-path-only index checked before field-path lowering");
1625                    };
1626
1627                    accepted_index_field_path_snapshot(schema, field_path)
1628                })
1629                .collect::<Result<Vec<_>, _>>()?,
1630        )
1631    } else {
1632        PersistedIndexKeySnapshot::Items(
1633            key_items
1634                .iter()
1635                .map(|key_item| match key_item {
1636                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1637                        accepted_index_field_path_snapshot(schema, field_path)
1638                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1639                    }
1640                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1641                        accepted_index_expression_snapshot(schema, expression)
1642                    }
1643                })
1644                .collect::<Result<Vec<_>, _>>()?,
1645        )
1646    };
1647
1648    Ok(PersistedIndexSnapshot::new_sql_ddl(
1649        schema.next_secondary_index_ordinal(),
1650        index_name.to_string(),
1651        index_store_path.to_string(),
1652        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1653        key,
1654        predicate_sql.map(str::to_string),
1655    ))
1656}
1657
1658fn accepted_index_field_path_snapshot(
1659    schema: &SchemaInfo,
1660    field_path: &BoundSqlDdlFieldPath,
1661) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1662    schema
1663        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1664        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1665            field_path: field_path.accepted_path().join("."),
1666        })
1667}
1668
1669fn accepted_index_expression_snapshot(
1670    schema: &SchemaInfo,
1671    expression: &BoundSqlDdlExpressionKey,
1672) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1673    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1674    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1675        return Err(SqlDdlBindError::FieldPathNotIndexable {
1676            field_path: expression.source().accepted_path().join("."),
1677        });
1678    };
1679
1680    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1681        PersistedIndexExpressionSnapshot::new(
1682            expression.op(),
1683            source.clone(),
1684            source.kind().clone(),
1685            output_kind,
1686            format!("expr:v1:{}", expression.canonical_sql()),
1687        ),
1688    )))
1689}
1690
1691fn expression_output_kind(
1692    op: PersistedIndexExpressionOp,
1693    source_kind: &PersistedFieldKind,
1694) -> Option<PersistedFieldKind> {
1695    match op {
1696        PersistedIndexExpressionOp::Lower
1697        | PersistedIndexExpressionOp::Upper
1698        | PersistedIndexExpressionOp::Trim
1699        | PersistedIndexExpressionOp::LowerTrim => {
1700            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1701                Some(source_kind.clone())
1702            } else {
1703                None
1704            }
1705        }
1706        PersistedIndexExpressionOp::Date => {
1707            if matches!(
1708                source_kind,
1709                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1710            ) {
1711                Some(PersistedFieldKind::Date)
1712            } else {
1713                None
1714            }
1715        }
1716        PersistedIndexExpressionOp::Year
1717        | PersistedIndexExpressionOp::Month
1718        | PersistedIndexExpressionOp::Day => {
1719            if matches!(
1720                source_kind,
1721                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1722            ) {
1723                Some(PersistedFieldKind::Int)
1724            } else {
1725                None
1726            }
1727        }
1728    }
1729}
1730
1731fn validated_create_index_predicate_sql(
1732    predicate_sql: Option<&str>,
1733    schema: &SchemaInfo,
1734) -> Result<Option<String>, SqlDdlBindError> {
1735    let Some(predicate_sql) = predicate_sql else {
1736        return Ok(None);
1737    };
1738    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1739        SqlDdlBindError::InvalidFilteredIndexPredicate {
1740            detail: error.to_string(),
1741        }
1742    })?;
1743    validate_predicate(schema, &predicate).map_err(|error| {
1744        SqlDdlBindError::InvalidFilteredIndexPredicate {
1745            detail: error.to_string(),
1746        }
1747    })?;
1748
1749    Ok(Some(predicate_sql.to_string()))
1750}
1751
1752fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1753    match field_paths {
1754        [field_path] => field_path.accepted_path().to_vec(),
1755        _ => vec![
1756            field_paths
1757                .iter()
1758                .map(|field_path| field_path.accepted_path().join("."))
1759                .collect::<Vec<_>>()
1760                .join(","),
1761        ],
1762    }
1763}
1764
1765fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1766    match key_items {
1767        [key_item] => vec![ddl_key_item_text(key_item)],
1768        _ => vec![
1769            key_items
1770                .iter()
1771                .map(ddl_key_item_text)
1772                .collect::<Vec<_>>()
1773                .join(","),
1774        ],
1775    }
1776}
1777
1778fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1779    match key_item {
1780        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1781        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1782    }
1783}
1784
1785/// Lower one bound SQL DDL request through schema mutation admission.
1786pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1787    request: &BoundSqlDdlRequest,
1788) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1789    match request.statement() {
1790        BoundSqlDdlStatement::AddColumn(add) => {
1791            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
1792        }
1793        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1794            Ok(admit_sql_ddl_field_default_candidate(alter.field()))
1795        }
1796        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
1797            Ok(admit_sql_ddl_field_nullability_candidate(alter.field()))
1798        }
1799        BoundSqlDdlStatement::CreateIndex(create) => {
1800            if create.candidate_index().key().is_field_path_only() {
1801                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1802            } else {
1803                admit_sql_ddl_expression_index_candidate(create.candidate_index())
1804            }
1805        }
1806        BoundSqlDdlStatement::DropIndex(drop) => {
1807            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
1808        }
1809        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1810    }
1811    .map_err(SqlDdlLoweringError::MutationAdmission)
1812}
1813
1814/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
1815pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
1816    accepted_before: &AcceptedSchemaSnapshot,
1817    request: &BoundSqlDdlRequest,
1818) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
1819    match request.statement() {
1820        BoundSqlDdlStatement::AddColumn(add) => {
1821            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
1822        }
1823        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1824            derive_sql_ddl_field_default_accepted_after(
1825                accepted_before,
1826                alter.field_name(),
1827                alter.default().clone(),
1828            )
1829        }
1830        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
1831            derive_sql_ddl_field_nullability_accepted_after(
1832                accepted_before,
1833                alter.field_name(),
1834                alter.nullable(),
1835            )
1836        }
1837        BoundSqlDdlStatement::CreateIndex(create) => {
1838            if create.candidate_index().key().is_field_path_only() {
1839                derive_sql_ddl_field_path_index_accepted_after(
1840                    accepted_before,
1841                    create.candidate_index().clone(),
1842                )
1843            } else {
1844                derive_sql_ddl_expression_index_accepted_after(
1845                    accepted_before,
1846                    create.candidate_index().clone(),
1847                )
1848            }
1849        }
1850        BoundSqlDdlStatement::DropIndex(drop) => {
1851            derive_sql_ddl_secondary_index_drop_accepted_after(
1852                accepted_before,
1853                drop.dropped_index(),
1854            )
1855        }
1856        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1857    }
1858    .map_err(SqlDdlLoweringError::MutationAdmission)
1859}
1860
1861fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
1862    match bound.statement() {
1863        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
1864            mutation_kind: if add.field().default().is_none() {
1865                SqlDdlMutationKind::AddNullableField
1866            } else {
1867                SqlDdlMutationKind::AddDefaultedField
1868            },
1869            target_index: add.field().name().to_string(),
1870            target_store: add.entity_name().to_string(),
1871            field_path: vec![add.field().name().to_string()],
1872            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1873            rows_scanned: 0,
1874            index_keys_written: 0,
1875        },
1876        BoundSqlDdlStatement::AlterColumnDefault(alter) => SqlDdlPreparationReport {
1877            mutation_kind: alter.mutation_kind(),
1878            target_index: alter.field_name().to_string(),
1879            target_store: alter.entity_name().to_string(),
1880            field_path: vec![alter.field_name().to_string()],
1881            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1882            rows_scanned: 0,
1883            index_keys_written: 0,
1884        },
1885        BoundSqlDdlStatement::AlterColumnNullability(alter) => SqlDdlPreparationReport {
1886            mutation_kind: alter.mutation_kind(),
1887            target_index: alter.field_name().to_string(),
1888            target_store: alter.entity_name().to_string(),
1889            field_path: vec![alter.field_name().to_string()],
1890            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1891            rows_scanned: 0,
1892            index_keys_written: 0,
1893        },
1894        BoundSqlDdlStatement::CreateIndex(create) => {
1895            let target = create.candidate_index();
1896
1897            SqlDdlPreparationReport {
1898                mutation_kind: if target.key().is_field_path_only() {
1899                    SqlDdlMutationKind::AddFieldPathIndex
1900                } else {
1901                    SqlDdlMutationKind::AddExpressionIndex
1902                },
1903                target_index: target.name().to_string(),
1904                target_store: target.store().to_string(),
1905                field_path: ddl_key_item_report(create.key_items()),
1906                execution_status: SqlDdlExecutionStatus::PreparedOnly,
1907                rows_scanned: 0,
1908                index_keys_written: 0,
1909            }
1910        }
1911        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
1912            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1913            target_index: drop.index_name().to_string(),
1914            target_store: drop.dropped_index().store().to_string(),
1915            field_path: drop.field_path().to_vec(),
1916            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1917            rows_scanned: 0,
1918            index_keys_written: 0,
1919        },
1920        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
1921            mutation_kind: no_op.mutation_kind(),
1922            target_index: no_op.index_name().to_string(),
1923            target_store: no_op.target_store().to_string(),
1924            field_path: no_op.field_path().to_vec(),
1925            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1926            rows_scanned: 0,
1927            index_keys_written: 0,
1928        },
1929    }
1930}