Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
25        admit_sql_ddl_field_path_index_candidate, admit_sql_ddl_secondary_index_drop_candidate,
26        canonicalize_strict_sql_literal_for_persisted_kind,
27        derive_sql_ddl_expression_index_accepted_after,
28        derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
29        derive_sql_ddl_field_path_index_accepted_after,
30        derive_sql_ddl_secondary_index_drop_accepted_after,
31        resolve_sql_ddl_secondary_index_drop_candidate,
32    },
33    sql::{
34        identifier::identifiers_tail_match,
35        parser::{
36            SqlAlterColumnAction, SqlAlterTableAddColumnStatement,
37            SqlAlterTableAlterColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
38            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
39            SqlDropIndexStatement, SqlStatement,
40        },
41    },
42};
43use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
44use thiserror::Error as ThisError;
45
46///
47/// PreparedSqlDdlCommand
48///
49/// Fully prepared SQL DDL command. This is intentionally not executable yet:
50/// it packages the accepted-catalog binding, accepted-after derivation, and
51/// schema mutation admission proof for the future execution boundary.
52///
53#[derive(Clone, Debug, Eq, PartialEq)]
54pub(in crate::db) struct PreparedSqlDdlCommand {
55    bound: BoundSqlDdlRequest,
56    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
57    report: SqlDdlPreparationReport,
58}
59
60impl PreparedSqlDdlCommand {
61    /// Borrow the accepted-catalog-bound DDL request.
62    #[must_use]
63    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
64        &self.bound
65    }
66
67    /// Borrow the accepted-after derivation proof.
68    #[must_use]
69    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
70        self.derivation.as_ref()
71    }
72
73    /// Borrow the developer-facing preparation report.
74    #[must_use]
75    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
76        &self.report
77    }
78
79    /// Return whether this prepared command needs schema or storage mutation.
80    #[must_use]
81    pub(in crate::db) const fn mutates_schema(&self) -> bool {
82        self.derivation.is_some()
83    }
84}
85
86///
87/// SqlDdlPreparationReport
88///
89/// Compact report for a DDL command that has passed all pre-execution
90/// frontend and schema-mutation checks.
91///
92#[derive(Clone, Debug, Eq, PartialEq)]
93pub struct SqlDdlPreparationReport {
94    mutation_kind: SqlDdlMutationKind,
95    target_index: String,
96    target_store: String,
97    field_path: Vec<String>,
98    execution_status: SqlDdlExecutionStatus,
99    rows_scanned: usize,
100    index_keys_written: usize,
101}
102
103impl SqlDdlPreparationReport {
104    /// Return the prepared DDL mutation kind.
105    #[must_use]
106    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
107        self.mutation_kind
108    }
109
110    /// Borrow the target accepted index name.
111    #[must_use]
112    pub const fn target_index(&self) -> &str {
113        self.target_index.as_str()
114    }
115
116    /// Borrow the target accepted index store path.
117    #[must_use]
118    pub const fn target_store(&self) -> &str {
119        self.target_store.as_str()
120    }
121
122    /// Borrow the target field path.
123    #[must_use]
124    pub const fn field_path(&self) -> &[String] {
125        self.field_path.as_slice()
126    }
127
128    /// Return the execution status captured by this DDL report.
129    #[must_use]
130    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
131        self.execution_status
132    }
133
134    /// Return rows scanned by DDL execution.
135    #[must_use]
136    pub const fn rows_scanned(&self) -> usize {
137        self.rows_scanned
138    }
139
140    /// Return index keys written by DDL execution.
141    #[must_use]
142    pub const fn index_keys_written(&self) -> usize {
143        self.index_keys_written
144    }
145
146    pub(in crate::db) const fn with_execution_status(
147        mut self,
148        execution_status: SqlDdlExecutionStatus,
149    ) -> Self {
150        self.execution_status = execution_status;
151        self
152    }
153
154    pub(in crate::db) const fn with_execution_metrics(
155        mut self,
156        rows_scanned: usize,
157        index_keys_written: usize,
158    ) -> Self {
159        self.rows_scanned = rows_scanned;
160        self.index_keys_written = index_keys_written;
161        self
162    }
163}
164
165///
166/// SqlDdlMutationKind
167///
168/// Developer-facing SQL DDL mutation kind.
169///
170#[derive(Clone, Copy, Debug, Eq, PartialEq)]
171pub enum SqlDdlMutationKind {
172    AddDefaultedField,
173    AddNullableField,
174    SetFieldDefault,
175    DropFieldDefault,
176    AddFieldPathIndex,
177    AddExpressionIndex,
178    DropSecondaryIndex,
179}
180
181impl SqlDdlMutationKind {
182    /// Return the stable diagnostic label for this DDL mutation kind.
183    #[must_use]
184    pub const fn as_str(self) -> &'static str {
185        match self {
186            Self::AddDefaultedField => "add_defaulted_field",
187            Self::AddNullableField => "add_nullable_field",
188            Self::SetFieldDefault => "set_field_default",
189            Self::DropFieldDefault => "drop_field_default",
190            Self::AddFieldPathIndex => "add_field_path_index",
191            Self::AddExpressionIndex => "add_expression_index",
192            Self::DropSecondaryIndex => "drop_secondary_index",
193        }
194    }
195}
196
197///
198/// SqlDdlExecutionStatus
199///
200/// SQL DDL execution state at the current boundary.
201///
202#[derive(Clone, Copy, Debug, Eq, PartialEq)]
203pub enum SqlDdlExecutionStatus {
204    PreparedOnly,
205    Published,
206    NoOp,
207}
208
209impl SqlDdlExecutionStatus {
210    /// Return the stable diagnostic label for this execution status.
211    #[must_use]
212    pub const fn as_str(self) -> &'static str {
213        match self {
214            Self::PreparedOnly => "prepared_only",
215            Self::Published => "published",
216            Self::NoOp => "no_op",
217        }
218    }
219}
220
221///
222/// BoundSqlDdlRequest
223///
224/// Accepted-catalog SQL DDL request after parser syntax has been resolved
225/// against one runtime schema snapshot.
226///
227#[derive(Clone, Debug, Eq, PartialEq)]
228pub(in crate::db) struct BoundSqlDdlRequest {
229    statement: BoundSqlDdlStatement,
230}
231
232impl BoundSqlDdlRequest {
233    /// Borrow the bound statement payload.
234    #[must_use]
235    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
236        &self.statement
237    }
238}
239
240///
241/// BoundSqlDdlStatement
242///
243/// Catalog-resolved DDL statement vocabulary.
244///
245#[derive(Clone, Debug, Eq, PartialEq)]
246pub(in crate::db) enum BoundSqlDdlStatement {
247    AddColumn(BoundSqlAddColumnRequest),
248    AlterColumnDefault(BoundSqlAlterColumnDefaultRequest),
249    CreateIndex(BoundSqlCreateIndexRequest),
250    DropIndex(BoundSqlDropIndexRequest),
251    NoOp(BoundSqlDdlNoOpRequest),
252}
253
254///
255/// BoundSqlAddColumnRequest
256///
257/// Catalog-resolved additive field DDL request.
258///
259#[derive(Clone, Debug, Eq, PartialEq)]
260pub(in crate::db) struct BoundSqlAddColumnRequest {
261    entity_name: String,
262    field: PersistedFieldSnapshot,
263}
264
265impl BoundSqlAddColumnRequest {
266    /// Borrow the accepted entity name.
267    #[must_use]
268    pub(in crate::db) const fn entity_name(&self) -> &str {
269        self.entity_name.as_str()
270    }
271
272    /// Borrow the accepted DDL-owned field snapshot to publish.
273    #[must_use]
274    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
275        &self.field
276    }
277}
278
279///
280/// BoundSqlAlterColumnDefaultRequest
281///
282/// Catalog-resolved field-default metadata DDL request.
283///
284#[derive(Clone, Debug, Eq, PartialEq)]
285pub(in crate::db) struct BoundSqlAlterColumnDefaultRequest {
286    entity_name: String,
287    field: PersistedFieldSnapshot,
288    default: SchemaFieldDefault,
289    mutation_kind: SqlDdlMutationKind,
290}
291
292impl BoundSqlAlterColumnDefaultRequest {
293    /// Borrow the accepted entity name.
294    #[must_use]
295    pub(in crate::db) const fn entity_name(&self) -> &str {
296        self.entity_name.as_str()
297    }
298
299    /// Borrow the accepted field name.
300    #[must_use]
301    pub(in crate::db) const fn field_name(&self) -> &str {
302        self.field.name()
303    }
304
305    /// Borrow the accepted field whose default will change.
306    #[must_use]
307    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
308        &self.field
309    }
310
311    /// Borrow the default contract to publish.
312    #[must_use]
313    pub(in crate::db) const fn default(&self) -> &SchemaFieldDefault {
314        &self.default
315    }
316
317    /// Return the field-default mutation kind.
318    #[must_use]
319    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
320        self.mutation_kind
321    }
322}
323
324///
325/// BoundSqlDdlNoOpRequest
326///
327/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
328///
329#[derive(Clone, Debug, Eq, PartialEq)]
330pub(in crate::db) struct BoundSqlDdlNoOpRequest {
331    mutation_kind: SqlDdlMutationKind,
332    index_name: String,
333    entity_name: String,
334    target_store: String,
335    field_path: Vec<String>,
336}
337
338impl BoundSqlDdlNoOpRequest {
339    /// Return the user-facing mutation family this no-op belongs to.
340    #[must_use]
341    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
342        self.mutation_kind
343    }
344
345    /// Borrow the requested index name.
346    #[must_use]
347    pub(in crate::db) const fn index_name(&self) -> &str {
348        self.index_name.as_str()
349    }
350
351    /// Borrow the accepted entity name that owns this request.
352    #[must_use]
353    pub(in crate::db) const fn entity_name(&self) -> &str {
354        self.entity_name.as_str()
355    }
356
357    /// Borrow the accepted index store path, or `-` when no target exists.
358    #[must_use]
359    pub(in crate::db) const fn target_store(&self) -> &str {
360        self.target_store.as_str()
361    }
362
363    /// Borrow the target field path, empty when no target exists.
364    #[must_use]
365    pub(in crate::db) const fn field_path(&self) -> &[String] {
366        self.field_path.as_slice()
367    }
368}
369
370///
371/// BoundSqlCreateIndexRequest
372///
373/// Catalog-resolved request for adding one secondary index.
374///
375#[derive(Clone, Debug, Eq, PartialEq)]
376pub(in crate::db) struct BoundSqlCreateIndexRequest {
377    index_name: String,
378    entity_name: String,
379    key_items: Vec<BoundSqlDdlCreateIndexKey>,
380    field_paths: Vec<BoundSqlDdlFieldPath>,
381    candidate_index: PersistedIndexSnapshot,
382}
383
384impl BoundSqlCreateIndexRequest {
385    /// Borrow the requested index name.
386    #[must_use]
387    pub(in crate::db) const fn index_name(&self) -> &str {
388        self.index_name.as_str()
389    }
390
391    /// Borrow the accepted entity name that owns this request.
392    #[must_use]
393    pub(in crate::db) const fn entity_name(&self) -> &str {
394        self.entity_name.as_str()
395    }
396
397    /// Borrow the accepted field-path targets.
398    #[must_use]
399    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
400        self.field_paths.as_slice()
401    }
402
403    /// Borrow the accepted key targets in DDL key order.
404    #[must_use]
405    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
406        self.key_items.as_slice()
407    }
408
409    /// Borrow the candidate accepted index snapshot for mutation admission.
410    #[must_use]
411    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
412        &self.candidate_index
413    }
414}
415
416///
417/// BoundSqlDropIndexRequest
418///
419/// Catalog-resolved request for dropping one DDL-published secondary index.
420///
421#[derive(Clone, Debug, Eq, PartialEq)]
422pub(in crate::db) struct BoundSqlDropIndexRequest {
423    index_name: String,
424    entity_name: String,
425    dropped_index: PersistedIndexSnapshot,
426    field_path: Vec<String>,
427}
428
429impl BoundSqlDropIndexRequest {
430    /// Borrow the requested index name.
431    #[must_use]
432    pub(in crate::db) const fn index_name(&self) -> &str {
433        self.index_name.as_str()
434    }
435
436    /// Borrow the accepted entity name that owns this request.
437    #[must_use]
438    pub(in crate::db) const fn entity_name(&self) -> &str {
439        self.entity_name.as_str()
440    }
441
442    /// Borrow the accepted index snapshot that will be removed.
443    #[must_use]
444    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
445        &self.dropped_index
446    }
447
448    /// Borrow the dropped field-path target.
449    #[must_use]
450    pub(in crate::db) const fn field_path(&self) -> &[String] {
451        self.field_path.as_slice()
452    }
453}
454
455///
456/// BoundSqlDdlFieldPath
457///
458/// Accepted field-path target for SQL DDL binding.
459///
460#[derive(Clone, Debug, Eq, PartialEq)]
461pub(in crate::db) struct BoundSqlDdlFieldPath {
462    root: String,
463    segments: Vec<String>,
464    accepted_path: Vec<String>,
465}
466
467impl BoundSqlDdlFieldPath {
468    /// Borrow the top-level field name.
469    #[must_use]
470    pub(in crate::db) const fn root(&self) -> &str {
471        self.root.as_str()
472    }
473
474    /// Borrow nested path segments below the top-level field.
475    #[must_use]
476    pub(in crate::db) const fn segments(&self) -> &[String] {
477        self.segments.as_slice()
478    }
479
480    /// Borrow the full accepted field path used by index metadata.
481    #[must_use]
482    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
483        self.accepted_path.as_slice()
484    }
485}
486
487///
488/// SqlDdlBindError
489///
490/// Typed fail-closed reasons for SQL DDL catalog binding.
491///
492#[derive(Debug, Eq, PartialEq, ThisError)]
493pub(in crate::db) enum SqlDdlBindError {
494    #[error("SQL DDL binder requires a DDL statement")]
495    NotDdl,
496
497    #[error("accepted schema does not expose an entity name")]
498    MissingEntityName,
499
500    #[error("accepted schema does not expose an entity path")]
501    MissingEntityPath,
502
503    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
504    EntityMismatch {
505        sql_entity: String,
506        expected_entity: String,
507    },
508
509    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
510    UnknownFieldPath {
511        entity_name: String,
512        field_path: String,
513    },
514
515    #[error("field path '{field_path}' is not indexable")]
516    FieldPathNotIndexable { field_path: String },
517
518    #[error("field path '{field_path}' depends on generated-only metadata")]
519    FieldPathNotAcceptedCatalogBacked { field_path: String },
520
521    #[error("invalid filtered index predicate: {detail}")]
522    InvalidFilteredIndexPredicate { detail: String },
523
524    #[error("index name '{index_name}' already exists in the accepted schema")]
525    DuplicateIndexName { index_name: String },
526
527    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
528    DuplicateFieldPathIndex {
529        field_path: String,
530        existing_index: String,
531    },
532
533    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
534    UnknownIndex {
535        entity_name: String,
536        index_name: String,
537    },
538
539    #[error(
540        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
541    )]
542    GeneratedIndexDropRejected { index_name: String },
543
544    #[error(
545        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
546    )]
547    UnsupportedDropIndex { index_name: String },
548
549    #[error(
550        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
551    )]
552    UnsupportedAlterTableAddColumn {
553        entity_name: String,
554        column_name: String,
555    },
556
557    #[error(
558        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
559    )]
560    InvalidAlterTableAddColumnDefault {
561        entity_name: String,
562        column_name: String,
563        detail: String,
564    },
565
566    #[error(
567        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
568    )]
569    UnsupportedAlterTableAddColumnNotNull {
570        entity_name: String,
571        column_name: String,
572    },
573
574    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
575    DuplicateColumn {
576        entity_name: String,
577        column_name: String,
578    },
579
580    #[error(
581        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
582    )]
583    UnsupportedAlterTableAddColumnType {
584        entity_name: String,
585        column_name: String,
586        column_type: String,
587    },
588
589    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
590    UnknownColumn {
591        entity_name: String,
592        column_name: String,
593    },
594
595    #[error(
596        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
597    )]
598    UnsupportedAlterTableAlterColumn {
599        entity_name: String,
600        column_name: String,
601        action: String,
602    },
603
604    #[error(
605        "SQL DDL ALTER TABLE ALTER COLUMN SET DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
606    )]
607    InvalidAlterTableAlterColumnDefault {
608        entity_name: String,
609        column_name: String,
610        detail: String,
611    },
612
613    #[error(
614        "SQL DDL ALTER TABLE ALTER COLUMN DROP DEFAULT is not executable yet for required accepted entity '{entity_name}' column '{column_name}'"
615    )]
616    UnsupportedAlterTableDropDefaultRequired {
617        entity_name: String,
618        column_name: String,
619    },
620
621    #[error(
622        "SQL DDL ALTER TABLE ALTER COLUMN DEFAULT cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema default instead"
623    )]
624    GeneratedFieldDefaultChangeRejected {
625        entity_name: String,
626        column_name: String,
627    },
628}
629
630///
631/// SqlDdlLoweringError
632///
633/// Typed fail-closed reasons while lowering bound DDL into schema mutation
634/// admission.
635///
636#[derive(Debug, Eq, PartialEq, ThisError)]
637pub(in crate::db) enum SqlDdlLoweringError {
638    #[error("SQL DDL lowering requires a supported DDL statement")]
639    UnsupportedStatement,
640
641    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
642    MutationAdmission(SchemaDdlMutationAdmissionError),
643}
644
645///
646/// SqlDdlPrepareError
647///
648/// Typed fail-closed preparation errors for SQL DDL.
649///
650#[derive(Debug, Eq, PartialEq, ThisError)]
651pub(in crate::db) enum SqlDdlPrepareError {
652    #[error("{0}")]
653    Bind(#[from] SqlDdlBindError),
654
655    #[error("{0}")]
656    Lowering(#[from] SqlDdlLoweringError),
657}
658
659/// Prepare one parsed SQL DDL statement through every pre-execution proof.
660pub(in crate::db) fn prepare_sql_ddl_statement(
661    statement: &SqlStatement,
662    accepted_before: &AcceptedSchemaSnapshot,
663    schema: &SchemaInfo,
664    index_store_path: &'static str,
665) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
666    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
667    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
668        None
669    } else {
670        Some(derive_bound_sql_ddl_accepted_after(
671            accepted_before,
672            &bound,
673        )?)
674    };
675    let report = ddl_preparation_report(&bound);
676
677    Ok(PreparedSqlDdlCommand {
678        bound,
679        derivation,
680        report,
681    })
682}
683
684/// Bind one parsed SQL DDL statement against accepted catalog metadata.
685pub(in crate::db) fn bind_sql_ddl_statement(
686    statement: &SqlStatement,
687    accepted_before: &AcceptedSchemaSnapshot,
688    schema: &SchemaInfo,
689    index_store_path: &'static str,
690) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
691    let SqlStatement::Ddl(ddl) = statement else {
692        return Err(SqlDdlBindError::NotDdl);
693    };
694
695    match ddl {
696        SqlDdlStatement::CreateIndex(statement) => {
697            bind_create_index_statement(statement, schema, index_store_path)
698        }
699        SqlDdlStatement::DropIndex(statement) => {
700            bind_drop_index_statement(statement, accepted_before, schema)
701        }
702        SqlDdlStatement::AlterTableAddColumn(statement) => {
703            bind_alter_table_add_column_statement(statement, accepted_before, schema)
704        }
705        SqlDdlStatement::AlterTableAlterColumn(statement) => {
706            bind_alter_table_alter_column_statement(statement, accepted_before, schema)
707        }
708    }
709}
710
711fn bind_create_index_statement(
712    statement: &SqlCreateIndexStatement,
713    schema: &SchemaInfo,
714    index_store_path: &'static str,
715) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
716    let entity_name = schema
717        .entity_name()
718        .ok_or(SqlDdlBindError::MissingEntityName)?;
719
720    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
721        return Err(SqlDdlBindError::EntityMismatch {
722            sql_entity: statement.entity.clone(),
723            expected_entity: entity_name.to_string(),
724        });
725    }
726
727    let key_items = statement
728        .key_items
729        .iter()
730        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
731        .collect::<Result<Vec<_>, _>>()?;
732    let field_paths = create_index_field_path_report_items(key_items.as_slice());
733    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
734        if key_items_are_field_path_only(key_items.as_slice())
735            && statement.if_not_exists
736            && existing_field_path_index_matches_request(
737                existing_index,
738                field_paths.as_slice(),
739                statement.predicate_sql.as_deref(),
740                statement.uniqueness,
741            )
742        {
743            return Ok(BoundSqlDdlRequest {
744                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
745                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
746                    index_name: statement.name.clone(),
747                    entity_name: entity_name.to_string(),
748                    target_store: existing_index.store().to_string(),
749                    field_path: ddl_field_path_report(field_paths.as_slice()),
750                }),
751            });
752        }
753
754        return Err(SqlDdlBindError::DuplicateIndexName {
755            index_name: statement.name.clone(),
756        });
757    }
758    let predicate_sql =
759        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
760    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
761        if statement.if_not_exists
762            && existing_expression_index_matches_request(
763                existing_index,
764                key_items.as_slice(),
765                predicate_sql.as_deref(),
766                statement.uniqueness,
767            )
768        {
769            return Ok(BoundSqlDdlRequest {
770                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
771                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
772                    index_name: statement.name.clone(),
773                    entity_name: entity_name.to_string(),
774                    target_store: existing_index.store().to_string(),
775                    field_path: ddl_key_item_report(key_items.as_slice()),
776                }),
777            });
778        }
779
780        return Err(SqlDdlBindError::DuplicateIndexName {
781            index_name: statement.name.clone(),
782        });
783    }
784    if key_items_are_field_path_only(key_items.as_slice()) {
785        reject_duplicate_field_path_index(
786            field_paths.as_slice(),
787            predicate_sql.as_deref(),
788            schema,
789        )?;
790    } else {
791        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
792    }
793    let candidate_index = candidate_index_snapshot(
794        statement.name.as_str(),
795        key_items.as_slice(),
796        predicate_sql.as_deref(),
797        statement.uniqueness,
798        schema,
799        index_store_path,
800    )?;
801
802    Ok(BoundSqlDdlRequest {
803        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
804            index_name: statement.name.clone(),
805            entity_name: entity_name.to_string(),
806            key_items,
807            field_paths,
808            candidate_index,
809        }),
810    })
811}
812
813fn bind_drop_index_statement(
814    statement: &SqlDropIndexStatement,
815    accepted_before: &AcceptedSchemaSnapshot,
816    schema: &SchemaInfo,
817) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
818    let entity_name = schema
819        .entity_name()
820        .ok_or(SqlDdlBindError::MissingEntityName)?;
821
822    if let Some(sql_entity) = statement.entity.as_deref()
823        && !identifiers_tail_match(sql_entity, entity_name)
824    {
825        return Err(SqlDdlBindError::EntityMismatch {
826            sql_entity: sql_entity.to_string(),
827            expected_entity: entity_name.to_string(),
828        });
829    }
830    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
831        accepted_before,
832        &statement.name,
833    )
834    .map_err(|error| match error {
835        SchemaDdlIndexDropCandidateError::Generated => {
836            SqlDdlBindError::GeneratedIndexDropRejected {
837                index_name: statement.name.clone(),
838            }
839        }
840        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
841            entity_name: entity_name.to_string(),
842            index_name: statement.name.clone(),
843        },
844        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
845            index_name: statement.name.clone(),
846        },
847    });
848    let (dropped_index, field_path) = match drop_candidate {
849        Ok((dropped_index, field_path)) => (dropped_index, field_path),
850        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
851            return Ok(BoundSqlDdlRequest {
852                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
853                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
854                    index_name: statement.name.clone(),
855                    entity_name: entity_name.to_string(),
856                    target_store: "-".to_string(),
857                    field_path: Vec::new(),
858                }),
859            });
860        }
861        Err(error) => return Err(error),
862    };
863    Ok(BoundSqlDdlRequest {
864        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
865            index_name: statement.name.clone(),
866            entity_name: entity_name.to_string(),
867            dropped_index,
868            field_path,
869        }),
870    })
871}
872
873fn bind_alter_table_add_column_statement(
874    statement: &SqlAlterTableAddColumnStatement,
875    accepted_before: &AcceptedSchemaSnapshot,
876    schema: &SchemaInfo,
877) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
878    let entity_name = schema
879        .entity_name()
880        .ok_or(SqlDdlBindError::MissingEntityName)?;
881
882    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
883        return Err(SqlDdlBindError::EntityMismatch {
884            sql_entity: statement.entity.clone(),
885            expected_entity: entity_name.to_string(),
886        });
887    }
888
889    if schema
890        .field_nullable(statement.column_name.as_str())
891        .is_some()
892    {
893        return Err(SqlDdlBindError::DuplicateColumn {
894            entity_name: entity_name.to_string(),
895            column_name: statement.column_name.clone(),
896        });
897    }
898
899    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
900        statement.column_type.as_str(),
901    )
902    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
903        entity_name: entity_name.to_string(),
904        column_name: statement.column_name.clone(),
905        column_type: statement.column_type.clone(),
906    })?;
907    let default = schema_field_default_for_sql_default(
908        entity_name,
909        statement.column_name.as_str(),
910        statement.default.as_ref(),
911        &kind,
912        statement.nullable,
913        storage_decode,
914        leaf_codec,
915    )?;
916    if !statement.nullable && default.is_none() {
917        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
918            entity_name: entity_name.to_string(),
919            column_name: statement.column_name.clone(),
920        });
921    }
922    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
923        next_sql_ddl_field_id(accepted_before),
924        statement.column_name.clone(),
925        next_sql_ddl_field_slot(accepted_before),
926        kind,
927        Vec::new(),
928        statement.nullable,
929        default,
930        SchemaFieldWritePolicy::from_model_policies(None, None),
931        PersistedFieldOrigin::SqlDdl,
932        storage_decode,
933        leaf_codec,
934    );
935
936    Ok(BoundSqlDdlRequest {
937        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
938            entity_name: entity_name.to_string(),
939            field,
940        }),
941    })
942}
943
944fn alter_table_alter_column_bind_error(
945    statement: &SqlAlterTableAlterColumnStatement,
946    schema: &SchemaInfo,
947) -> SqlDdlBindError {
948    let Some(entity_name) = schema.entity_name() else {
949        return SqlDdlBindError::MissingEntityName;
950    };
951
952    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
953        return SqlDdlBindError::EntityMismatch {
954            sql_entity: statement.entity.clone(),
955            expected_entity: entity_name.to_string(),
956        };
957    }
958
959    if schema
960        .field_nullable(statement.column_name.as_str())
961        .is_none()
962    {
963        return SqlDdlBindError::UnknownColumn {
964            entity_name: entity_name.to_string(),
965            column_name: statement.column_name.clone(),
966        };
967    }
968
969    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
970        entity_name: entity_name.to_string(),
971        column_name: statement.column_name.clone(),
972        action: statement.action.label().to_string(),
973    }
974}
975
976fn bind_alter_table_alter_column_statement(
977    statement: &SqlAlterTableAlterColumnStatement,
978    accepted_before: &AcceptedSchemaSnapshot,
979    schema: &SchemaInfo,
980) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
981    let Some(entity_name) = schema.entity_name() else {
982        return Err(SqlDdlBindError::MissingEntityName);
983    };
984
985    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
986        return Err(SqlDdlBindError::EntityMismatch {
987            sql_entity: statement.entity.clone(),
988            expected_entity: entity_name.to_string(),
989        });
990    }
991
992    let field = accepted_before
993        .persisted_snapshot()
994        .fields()
995        .iter()
996        .find(|field| field.name() == statement.column_name)
997        .ok_or_else(|| SqlDdlBindError::UnknownColumn {
998            entity_name: entity_name.to_string(),
999            column_name: statement.column_name.clone(),
1000        })?;
1001
1002    match &statement.action {
1003        SqlAlterColumnAction::SetDefault(default) => {
1004            reject_generated_field_default_change(entity_name, field)?;
1005            let default =
1006                schema_field_default_for_alter_column_default(entity_name, field, default)?;
1007            Ok(bind_alter_table_alter_column_default(
1008                entity_name,
1009                field,
1010                default,
1011                SqlDdlMutationKind::SetFieldDefault,
1012            ))
1013        }
1014        SqlAlterColumnAction::DropDefault => {
1015            if !field.default().is_none() {
1016                reject_generated_field_default_change(entity_name, field)?;
1017            }
1018            if !field.nullable() && !field.default().is_none() {
1019                return Err(SqlDdlBindError::UnsupportedAlterTableDropDefaultRequired {
1020                    entity_name: entity_name.to_string(),
1021                    column_name: statement.column_name.clone(),
1022                });
1023            }
1024            Ok(bind_alter_table_alter_column_default(
1025                entity_name,
1026                field,
1027                SchemaFieldDefault::None,
1028                SqlDdlMutationKind::DropFieldDefault,
1029            ))
1030        }
1031        SqlAlterColumnAction::SetNotNull | SqlAlterColumnAction::DropNotNull => {
1032            Err(alter_table_alter_column_bind_error(statement, schema))
1033        }
1034    }
1035}
1036
1037fn bind_alter_table_alter_column_default(
1038    entity_name: &str,
1039    field: &PersistedFieldSnapshot,
1040    default: SchemaFieldDefault,
1041    mutation_kind: SqlDdlMutationKind,
1042) -> BoundSqlDdlRequest {
1043    if field.default() == &default {
1044        return BoundSqlDdlRequest {
1045            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1046                mutation_kind,
1047                index_name: field.name().to_string(),
1048                entity_name: entity_name.to_string(),
1049                target_store: entity_name.to_string(),
1050                field_path: vec![field.name().to_string()],
1051            }),
1052        };
1053    }
1054
1055    BoundSqlDdlRequest {
1056        statement: BoundSqlDdlStatement::AlterColumnDefault(BoundSqlAlterColumnDefaultRequest {
1057            entity_name: entity_name.to_string(),
1058            field: field.clone(),
1059            default,
1060            mutation_kind,
1061        }),
1062    }
1063}
1064
1065fn reject_generated_field_default_change(
1066    entity_name: &str,
1067    field: &PersistedFieldSnapshot,
1068) -> Result<(), SqlDdlBindError> {
1069    if field.generated() {
1070        return Err(SqlDdlBindError::GeneratedFieldDefaultChangeRejected {
1071            entity_name: entity_name.to_string(),
1072            column_name: field.name().to_string(),
1073        });
1074    }
1075
1076    Ok(())
1077}
1078
1079fn schema_field_default_for_sql_default(
1080    entity_name: &str,
1081    column_name: &str,
1082    default: Option<&crate::value::Value>,
1083    kind: &PersistedFieldKind,
1084    nullable: bool,
1085    storage_decode: FieldStorageDecode,
1086    leaf_codec: LeafCodec,
1087) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1088    let Some(default) = default else {
1089        return Ok(SchemaFieldDefault::None);
1090    };
1091    if matches!(default, crate::value::Value::Null) {
1092        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1093            entity_name: entity_name.to_string(),
1094            column_name: column_name.to_string(),
1095            detail: "NULL cannot be used as an accepted database default".to_string(),
1096        });
1097    }
1098
1099    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
1100        .unwrap_or_else(|| default.clone());
1101    let contract =
1102        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
1103    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1104        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1105            entity_name: entity_name.to_string(),
1106            column_name: column_name.to_string(),
1107            detail: error.to_string(),
1108        },
1109    )?;
1110
1111    Ok(SchemaFieldDefault::SlotPayload(payload))
1112}
1113
1114fn schema_field_default_for_alter_column_default(
1115    entity_name: &str,
1116    field: &PersistedFieldSnapshot,
1117    default: &crate::value::Value,
1118) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1119    if matches!(default, crate::value::Value::Null) {
1120        return Err(SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1121            entity_name: entity_name.to_string(),
1122            column_name: field.name().to_string(),
1123            detail: "NULL cannot be used as an accepted database default".to_string(),
1124        });
1125    }
1126
1127    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(field.kind(), default)
1128        .unwrap_or_else(|| default.clone());
1129    let contract = AcceptedFieldDecodeContract::new(
1130        field.name(),
1131        field.kind(),
1132        field.nullable(),
1133        field.storage_decode(),
1134        field.leaf_codec(),
1135    );
1136    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1137        |error| SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1138            entity_name: entity_name.to_string(),
1139            column_name: field.name().to_string(),
1140            detail: error.to_string(),
1141        },
1142    )?;
1143
1144    Ok(SchemaFieldDefault::SlotPayload(payload))
1145}
1146
1147fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
1148    let next = accepted_before
1149        .persisted_snapshot()
1150        .fields()
1151        .iter()
1152        .map(|field| field.id().get())
1153        .max()
1154        .unwrap_or(0)
1155        .checked_add(1)
1156        .expect("accepted field IDs should not be exhausted");
1157
1158    FieldId::new(next)
1159}
1160
1161fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
1162    let next = accepted_before
1163        .persisted_snapshot()
1164        .row_layout()
1165        .field_to_slot()
1166        .iter()
1167        .map(|(_, slot)| slot.get())
1168        .max()
1169        .unwrap_or(0)
1170        .checked_add(1)
1171        .expect("accepted row slots should not be exhausted");
1172
1173    SchemaFieldSlot::new(next)
1174}
1175
1176fn persisted_field_contract_for_sql_column_type(
1177    column_type: &str,
1178) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
1179    let normalized = column_type.trim().to_ascii_lowercase();
1180    match normalized.as_str() {
1181        "bool" | "boolean" => Some((
1182            PersistedFieldKind::Bool,
1183            FieldStorageDecode::ByKind,
1184            LeafCodec::Scalar(ScalarCodec::Bool),
1185        )),
1186        "int" | "integer" => Some((
1187            PersistedFieldKind::Int,
1188            FieldStorageDecode::ByKind,
1189            LeafCodec::Scalar(ScalarCodec::Int64),
1190        )),
1191        "nat" | "natural" => Some((
1192            PersistedFieldKind::Nat,
1193            FieldStorageDecode::ByKind,
1194            LeafCodec::Scalar(ScalarCodec::Nat64),
1195        )),
1196        "text" | "string" => Some((
1197            PersistedFieldKind::Text { max_len: None },
1198            FieldStorageDecode::ByKind,
1199            LeafCodec::Scalar(ScalarCodec::Text),
1200        )),
1201        _ => None,
1202    }
1203}
1204
1205#[derive(Clone, Debug, Eq, PartialEq)]
1206pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
1207    FieldPath(BoundSqlDdlFieldPath),
1208    Expression(BoundSqlDdlExpressionKey),
1209}
1210
1211///
1212/// BoundSqlDdlExpressionKey
1213///
1214/// Accepted expression-index key target for SQL DDL binding.
1215///
1216#[derive(Clone, Debug, Eq, PartialEq)]
1217pub(in crate::db) struct BoundSqlDdlExpressionKey {
1218    op: PersistedIndexExpressionOp,
1219    source: BoundSqlDdlFieldPath,
1220    canonical_sql: String,
1221}
1222
1223impl BoundSqlDdlExpressionKey {
1224    /// Return the accepted expression operation.
1225    #[must_use]
1226    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1227        self.op
1228    }
1229
1230    /// Borrow the accepted source field path.
1231    #[must_use]
1232    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1233        &self.source
1234    }
1235
1236    /// Borrow the SQL-facing canonical expression text.
1237    #[must_use]
1238    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1239        self.canonical_sql.as_str()
1240    }
1241}
1242
1243fn bind_create_index_key_item(
1244    key_item: &SqlCreateIndexKeyItem,
1245    entity_name: &str,
1246    schema: &SchemaInfo,
1247) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1248    match key_item {
1249        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1250            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1251                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1252        }
1253        SqlCreateIndexKeyItem::Expression(expression) => {
1254            bind_create_index_expression_key(expression, entity_name, schema)
1255        }
1256    }
1257}
1258
1259fn bind_create_index_expression_key(
1260    expression: &SqlCreateIndexExpressionKey,
1261    entity_name: &str,
1262    schema: &SchemaInfo,
1263) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1264    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1265
1266    Ok(BoundSqlDdlCreateIndexKey::Expression(
1267        BoundSqlDdlExpressionKey {
1268            op: expression_op_from_sql_function(expression.function),
1269            source,
1270            canonical_sql: expression.canonical_sql(),
1271        },
1272    ))
1273}
1274
1275const fn expression_op_from_sql_function(
1276    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1277) -> PersistedIndexExpressionOp {
1278    match function {
1279        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1280            PersistedIndexExpressionOp::Lower
1281        }
1282        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1283            PersistedIndexExpressionOp::Upper
1284        }
1285        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1286            PersistedIndexExpressionOp::Trim
1287        }
1288    }
1289}
1290
1291fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1292    key_items
1293        .iter()
1294        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1295}
1296
1297fn create_index_field_path_report_items(
1298    key_items: &[BoundSqlDdlCreateIndexKey],
1299) -> Vec<BoundSqlDdlFieldPath> {
1300    key_items
1301        .iter()
1302        .map(|key_item| match key_item {
1303            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1304            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1305        })
1306        .collect()
1307}
1308
1309fn bind_create_index_field_path(
1310    field_path: &str,
1311    entity_name: &str,
1312    schema: &SchemaInfo,
1313) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1314    let mut path = field_path
1315        .split('.')
1316        .map(str::trim)
1317        .filter(|segment| !segment.is_empty());
1318    let Some(root) = path.next() else {
1319        return Err(SqlDdlBindError::UnknownFieldPath {
1320            entity_name: entity_name.to_string(),
1321            field_path: field_path.to_string(),
1322        });
1323    };
1324    let segments = path.map(str::to_string).collect::<Vec<_>>();
1325
1326    let capabilities = if segments.is_empty() {
1327        schema.sql_capabilities(root)
1328    } else {
1329        schema.nested_sql_capabilities(root, segments.as_slice())
1330    }
1331    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1332        entity_name: entity_name.to_string(),
1333        field_path: field_path.to_string(),
1334    })?;
1335
1336    if !capabilities.orderable() {
1337        return Err(SqlDdlBindError::FieldPathNotIndexable {
1338            field_path: field_path.to_string(),
1339        });
1340    }
1341
1342    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1343    accepted_path.push(root.to_string());
1344    accepted_path.extend(segments.iter().cloned());
1345
1346    Ok(BoundSqlDdlFieldPath {
1347        root: root.to_string(),
1348        segments,
1349        accepted_path,
1350    })
1351}
1352
1353fn find_field_path_index_by_name<'a>(
1354    schema: &'a SchemaInfo,
1355    index_name: &str,
1356) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1357    schema
1358        .field_path_indexes()
1359        .iter()
1360        .find(|index| index.name() == index_name)
1361}
1362
1363fn existing_field_path_index_matches_request(
1364    index: &crate::db::schema::SchemaIndexInfo,
1365    field_paths: &[BoundSqlDdlFieldPath],
1366    predicate_sql: Option<&str>,
1367    uniqueness: SqlCreateIndexUniqueness,
1368) -> bool {
1369    let fields = index.fields();
1370
1371    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1372        && index.predicate_sql() == predicate_sql
1373        && fields.len() == field_paths.len()
1374        && fields
1375            .iter()
1376            .zip(field_paths)
1377            .all(|(field, requested)| field.path() == requested.accepted_path())
1378}
1379
1380fn find_expression_index_by_name<'a>(
1381    schema: &'a SchemaInfo,
1382    index_name: &str,
1383) -> Option<&'a SchemaExpressionIndexInfo> {
1384    schema
1385        .expression_indexes()
1386        .iter()
1387        .find(|index| index.name() == index_name)
1388}
1389
1390fn existing_expression_index_matches_request(
1391    index: &SchemaExpressionIndexInfo,
1392    key_items: &[BoundSqlDdlCreateIndexKey],
1393    predicate_sql: Option<&str>,
1394    uniqueness: SqlCreateIndexUniqueness,
1395) -> bool {
1396    let existing_key_items = index.key_items();
1397
1398    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1399        && index.predicate_sql() == predicate_sql
1400        && existing_key_items.len() == key_items.len()
1401        && existing_key_items
1402            .iter()
1403            .zip(key_items)
1404            .all(existing_expression_key_item_matches_request)
1405}
1406
1407fn existing_expression_key_item_matches_request(
1408    existing: (
1409        &SchemaExpressionIndexKeyItemInfo,
1410        &BoundSqlDdlCreateIndexKey,
1411    ),
1412) -> bool {
1413    let (existing, requested) = existing;
1414    match (existing, requested) {
1415        (
1416            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1417            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1418        ) => existing.path() == requested.accepted_path(),
1419        (
1420            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1421            BoundSqlDdlCreateIndexKey::Expression(requested),
1422        ) => existing_expression_component_matches_request(
1423            existing.op(),
1424            existing.source().path(),
1425            existing.canonical_text(),
1426            requested,
1427        ),
1428        _ => false,
1429    }
1430}
1431
1432fn existing_expression_component_matches_request(
1433    existing_op: PersistedIndexExpressionOp,
1434    existing_path: &[String],
1435    existing_canonical_text: &str,
1436    requested: &BoundSqlDdlExpressionKey,
1437) -> bool {
1438    let requested_path = requested.source().accepted_path();
1439    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1440
1441    existing_op == requested.op()
1442        && existing_path == requested_path
1443        && existing_canonical_text == requested_canonical_text
1444}
1445
1446fn reject_duplicate_expression_index(
1447    key_items: &[BoundSqlDdlCreateIndexKey],
1448    predicate_sql: Option<&str>,
1449    schema: &SchemaInfo,
1450) -> Result<(), SqlDdlBindError> {
1451    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1452        existing_expression_index_matches_request(
1453            index,
1454            key_items,
1455            predicate_sql,
1456            if index.unique() {
1457                SqlCreateIndexUniqueness::Unique
1458            } else {
1459                SqlCreateIndexUniqueness::NonUnique
1460            },
1461        )
1462    }) else {
1463        return Ok(());
1464    };
1465
1466    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1467        field_path: ddl_key_item_report(key_items).join(","),
1468        existing_index: existing_index.name().to_string(),
1469    })
1470}
1471
1472fn reject_duplicate_field_path_index(
1473    field_paths: &[BoundSqlDdlFieldPath],
1474    predicate_sql: Option<&str>,
1475    schema: &SchemaInfo,
1476) -> Result<(), SqlDdlBindError> {
1477    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1478        let fields = index.fields();
1479        index.predicate_sql() == predicate_sql
1480            && fields.len() == field_paths.len()
1481            && fields
1482                .iter()
1483                .zip(field_paths)
1484                .all(|(field, requested)| field.path() == requested.accepted_path())
1485    }) else {
1486        return Ok(());
1487    };
1488
1489    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1490        field_path: ddl_field_path_report(field_paths).join(","),
1491        existing_index: existing_index.name().to_string(),
1492    })
1493}
1494
1495fn candidate_index_snapshot(
1496    index_name: &str,
1497    key_items: &[BoundSqlDdlCreateIndexKey],
1498    predicate_sql: Option<&str>,
1499    uniqueness: SqlCreateIndexUniqueness,
1500    schema: &SchemaInfo,
1501    index_store_path: &'static str,
1502) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1503    let key = if key_items_are_field_path_only(key_items) {
1504        PersistedIndexKeySnapshot::FieldPath(
1505            key_items
1506                .iter()
1507                .map(|key_item| {
1508                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1509                        unreachable!("field-path-only index checked before field-path lowering");
1510                    };
1511
1512                    accepted_index_field_path_snapshot(schema, field_path)
1513                })
1514                .collect::<Result<Vec<_>, _>>()?,
1515        )
1516    } else {
1517        PersistedIndexKeySnapshot::Items(
1518            key_items
1519                .iter()
1520                .map(|key_item| match key_item {
1521                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1522                        accepted_index_field_path_snapshot(schema, field_path)
1523                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1524                    }
1525                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1526                        accepted_index_expression_snapshot(schema, expression)
1527                    }
1528                })
1529                .collect::<Result<Vec<_>, _>>()?,
1530        )
1531    };
1532
1533    Ok(PersistedIndexSnapshot::new_sql_ddl(
1534        schema.next_secondary_index_ordinal(),
1535        index_name.to_string(),
1536        index_store_path.to_string(),
1537        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1538        key,
1539        predicate_sql.map(str::to_string),
1540    ))
1541}
1542
1543fn accepted_index_field_path_snapshot(
1544    schema: &SchemaInfo,
1545    field_path: &BoundSqlDdlFieldPath,
1546) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1547    schema
1548        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1549        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1550            field_path: field_path.accepted_path().join("."),
1551        })
1552}
1553
1554fn accepted_index_expression_snapshot(
1555    schema: &SchemaInfo,
1556    expression: &BoundSqlDdlExpressionKey,
1557) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1558    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1559    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1560        return Err(SqlDdlBindError::FieldPathNotIndexable {
1561            field_path: expression.source().accepted_path().join("."),
1562        });
1563    };
1564
1565    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1566        PersistedIndexExpressionSnapshot::new(
1567            expression.op(),
1568            source.clone(),
1569            source.kind().clone(),
1570            output_kind,
1571            format!("expr:v1:{}", expression.canonical_sql()),
1572        ),
1573    )))
1574}
1575
1576fn expression_output_kind(
1577    op: PersistedIndexExpressionOp,
1578    source_kind: &PersistedFieldKind,
1579) -> Option<PersistedFieldKind> {
1580    match op {
1581        PersistedIndexExpressionOp::Lower
1582        | PersistedIndexExpressionOp::Upper
1583        | PersistedIndexExpressionOp::Trim
1584        | PersistedIndexExpressionOp::LowerTrim => {
1585            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1586                Some(source_kind.clone())
1587            } else {
1588                None
1589            }
1590        }
1591        PersistedIndexExpressionOp::Date => {
1592            if matches!(
1593                source_kind,
1594                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1595            ) {
1596                Some(PersistedFieldKind::Date)
1597            } else {
1598                None
1599            }
1600        }
1601        PersistedIndexExpressionOp::Year
1602        | PersistedIndexExpressionOp::Month
1603        | PersistedIndexExpressionOp::Day => {
1604            if matches!(
1605                source_kind,
1606                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1607            ) {
1608                Some(PersistedFieldKind::Int)
1609            } else {
1610                None
1611            }
1612        }
1613    }
1614}
1615
1616fn validated_create_index_predicate_sql(
1617    predicate_sql: Option<&str>,
1618    schema: &SchemaInfo,
1619) -> Result<Option<String>, SqlDdlBindError> {
1620    let Some(predicate_sql) = predicate_sql else {
1621        return Ok(None);
1622    };
1623    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1624        SqlDdlBindError::InvalidFilteredIndexPredicate {
1625            detail: error.to_string(),
1626        }
1627    })?;
1628    validate_predicate(schema, &predicate).map_err(|error| {
1629        SqlDdlBindError::InvalidFilteredIndexPredicate {
1630            detail: error.to_string(),
1631        }
1632    })?;
1633
1634    Ok(Some(predicate_sql.to_string()))
1635}
1636
1637fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1638    match field_paths {
1639        [field_path] => field_path.accepted_path().to_vec(),
1640        _ => vec![
1641            field_paths
1642                .iter()
1643                .map(|field_path| field_path.accepted_path().join("."))
1644                .collect::<Vec<_>>()
1645                .join(","),
1646        ],
1647    }
1648}
1649
1650fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1651    match key_items {
1652        [key_item] => vec![ddl_key_item_text(key_item)],
1653        _ => vec![
1654            key_items
1655                .iter()
1656                .map(ddl_key_item_text)
1657                .collect::<Vec<_>>()
1658                .join(","),
1659        ],
1660    }
1661}
1662
1663fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1664    match key_item {
1665        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1666        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1667    }
1668}
1669
1670/// Lower one bound SQL DDL request through schema mutation admission.
1671pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1672    request: &BoundSqlDdlRequest,
1673) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1674    match request.statement() {
1675        BoundSqlDdlStatement::AddColumn(add) => {
1676            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
1677        }
1678        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1679            Ok(admit_sql_ddl_field_default_candidate(alter.field()))
1680        }
1681        BoundSqlDdlStatement::CreateIndex(create) => {
1682            if create.candidate_index().key().is_field_path_only() {
1683                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1684            } else {
1685                admit_sql_ddl_expression_index_candidate(create.candidate_index())
1686            }
1687        }
1688        BoundSqlDdlStatement::DropIndex(drop) => {
1689            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
1690        }
1691        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1692    }
1693    .map_err(SqlDdlLoweringError::MutationAdmission)
1694}
1695
1696/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
1697pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
1698    accepted_before: &AcceptedSchemaSnapshot,
1699    request: &BoundSqlDdlRequest,
1700) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
1701    match request.statement() {
1702        BoundSqlDdlStatement::AddColumn(add) => {
1703            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
1704        }
1705        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1706            derive_sql_ddl_field_default_accepted_after(
1707                accepted_before,
1708                alter.field_name(),
1709                alter.default().clone(),
1710            )
1711        }
1712        BoundSqlDdlStatement::CreateIndex(create) => {
1713            if create.candidate_index().key().is_field_path_only() {
1714                derive_sql_ddl_field_path_index_accepted_after(
1715                    accepted_before,
1716                    create.candidate_index().clone(),
1717                )
1718            } else {
1719                derive_sql_ddl_expression_index_accepted_after(
1720                    accepted_before,
1721                    create.candidate_index().clone(),
1722                )
1723            }
1724        }
1725        BoundSqlDdlStatement::DropIndex(drop) => {
1726            derive_sql_ddl_secondary_index_drop_accepted_after(
1727                accepted_before,
1728                drop.dropped_index(),
1729            )
1730        }
1731        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1732    }
1733    .map_err(SqlDdlLoweringError::MutationAdmission)
1734}
1735
1736fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
1737    match bound.statement() {
1738        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
1739            mutation_kind: if add.field().default().is_none() {
1740                SqlDdlMutationKind::AddNullableField
1741            } else {
1742                SqlDdlMutationKind::AddDefaultedField
1743            },
1744            target_index: add.field().name().to_string(),
1745            target_store: add.entity_name().to_string(),
1746            field_path: vec![add.field().name().to_string()],
1747            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1748            rows_scanned: 0,
1749            index_keys_written: 0,
1750        },
1751        BoundSqlDdlStatement::AlterColumnDefault(alter) => SqlDdlPreparationReport {
1752            mutation_kind: alter.mutation_kind(),
1753            target_index: alter.field_name().to_string(),
1754            target_store: alter.entity_name().to_string(),
1755            field_path: vec![alter.field_name().to_string()],
1756            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1757            rows_scanned: 0,
1758            index_keys_written: 0,
1759        },
1760        BoundSqlDdlStatement::CreateIndex(create) => {
1761            let target = create.candidate_index();
1762
1763            SqlDdlPreparationReport {
1764                mutation_kind: if target.key().is_field_path_only() {
1765                    SqlDdlMutationKind::AddFieldPathIndex
1766                } else {
1767                    SqlDdlMutationKind::AddExpressionIndex
1768                },
1769                target_index: target.name().to_string(),
1770                target_store: target.store().to_string(),
1771                field_path: ddl_key_item_report(create.key_items()),
1772                execution_status: SqlDdlExecutionStatus::PreparedOnly,
1773                rows_scanned: 0,
1774                index_keys_written: 0,
1775            }
1776        }
1777        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
1778            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1779            target_index: drop.index_name().to_string(),
1780            target_store: drop.dropped_index().store().to_string(),
1781            field_path: drop.field_path().to_vec(),
1782            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1783            rows_scanned: 0,
1784            index_keys_written: 0,
1785        },
1786        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
1787            mutation_kind: no_op.mutation_kind(),
1788            target_index: no_op.index_name().to_string(),
1789            target_store: no_op.target_store().to_string(),
1790            field_path: no_op.field_path().to_vec(),
1791            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1792            rows_scanned: 0,
1793            index_keys_written: 0,
1794        },
1795    }
1796}