Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_path_index_candidate,
25        admit_sql_ddl_secondary_index_drop_candidate,
26        canonicalize_strict_sql_literal_for_persisted_kind,
27        derive_sql_ddl_expression_index_accepted_after,
28        derive_sql_ddl_field_addition_accepted_after,
29        derive_sql_ddl_field_path_index_accepted_after,
30        derive_sql_ddl_secondary_index_drop_accepted_after,
31        resolve_sql_ddl_secondary_index_drop_candidate,
32    },
33    sql::{
34        identifier::identifiers_tail_match,
35        parser::{
36            SqlAlterTableAddColumnStatement, SqlAlterTableAlterColumnStatement,
37            SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem, SqlCreateIndexStatement,
38            SqlCreateIndexUniqueness, SqlDdlStatement, SqlDropIndexStatement, SqlStatement,
39        },
40    },
41};
42use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
43use thiserror::Error as ThisError;
44
45///
46/// PreparedSqlDdlCommand
47///
48/// Fully prepared SQL DDL command. This is intentionally not executable yet:
49/// it packages the accepted-catalog binding, accepted-after derivation, and
50/// schema mutation admission proof for the future execution boundary.
51///
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub(in crate::db) struct PreparedSqlDdlCommand {
54    bound: BoundSqlDdlRequest,
55    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
56    report: SqlDdlPreparationReport,
57}
58
59impl PreparedSqlDdlCommand {
60    /// Borrow the accepted-catalog-bound DDL request.
61    #[must_use]
62    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
63        &self.bound
64    }
65
66    /// Borrow the accepted-after derivation proof.
67    #[must_use]
68    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
69        self.derivation.as_ref()
70    }
71
72    /// Borrow the developer-facing preparation report.
73    #[must_use]
74    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
75        &self.report
76    }
77
78    /// Return whether this prepared command needs schema or storage mutation.
79    #[must_use]
80    pub(in crate::db) const fn mutates_schema(&self) -> bool {
81        self.derivation.is_some()
82    }
83}
84
85///
86/// SqlDdlPreparationReport
87///
88/// Compact report for a DDL command that has passed all pre-execution
89/// frontend and schema-mutation checks.
90///
91#[derive(Clone, Debug, Eq, PartialEq)]
92pub struct SqlDdlPreparationReport {
93    mutation_kind: SqlDdlMutationKind,
94    target_index: String,
95    target_store: String,
96    field_path: Vec<String>,
97    execution_status: SqlDdlExecutionStatus,
98    rows_scanned: usize,
99    index_keys_written: usize,
100}
101
102impl SqlDdlPreparationReport {
103    /// Return the prepared DDL mutation kind.
104    #[must_use]
105    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
106        self.mutation_kind
107    }
108
109    /// Borrow the target accepted index name.
110    #[must_use]
111    pub const fn target_index(&self) -> &str {
112        self.target_index.as_str()
113    }
114
115    /// Borrow the target accepted index store path.
116    #[must_use]
117    pub const fn target_store(&self) -> &str {
118        self.target_store.as_str()
119    }
120
121    /// Borrow the target field path.
122    #[must_use]
123    pub const fn field_path(&self) -> &[String] {
124        self.field_path.as_slice()
125    }
126
127    /// Return the execution status captured by this DDL report.
128    #[must_use]
129    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
130        self.execution_status
131    }
132
133    /// Return rows scanned by DDL execution.
134    #[must_use]
135    pub const fn rows_scanned(&self) -> usize {
136        self.rows_scanned
137    }
138
139    /// Return index keys written by DDL execution.
140    #[must_use]
141    pub const fn index_keys_written(&self) -> usize {
142        self.index_keys_written
143    }
144
145    pub(in crate::db) const fn with_execution_status(
146        mut self,
147        execution_status: SqlDdlExecutionStatus,
148    ) -> Self {
149        self.execution_status = execution_status;
150        self
151    }
152
153    pub(in crate::db) const fn with_execution_metrics(
154        mut self,
155        rows_scanned: usize,
156        index_keys_written: usize,
157    ) -> Self {
158        self.rows_scanned = rows_scanned;
159        self.index_keys_written = index_keys_written;
160        self
161    }
162}
163
164///
165/// SqlDdlMutationKind
166///
167/// Developer-facing SQL DDL mutation kind.
168///
169#[derive(Clone, Copy, Debug, Eq, PartialEq)]
170pub enum SqlDdlMutationKind {
171    AddDefaultedField,
172    AddNullableField,
173    AddFieldPathIndex,
174    AddExpressionIndex,
175    DropSecondaryIndex,
176}
177
178impl SqlDdlMutationKind {
179    /// Return the stable diagnostic label for this DDL mutation kind.
180    #[must_use]
181    pub const fn as_str(self) -> &'static str {
182        match self {
183            Self::AddDefaultedField => "add_defaulted_field",
184            Self::AddNullableField => "add_nullable_field",
185            Self::AddFieldPathIndex => "add_field_path_index",
186            Self::AddExpressionIndex => "add_expression_index",
187            Self::DropSecondaryIndex => "drop_secondary_index",
188        }
189    }
190}
191
192///
193/// SqlDdlExecutionStatus
194///
195/// SQL DDL execution state at the current boundary.
196///
197#[derive(Clone, Copy, Debug, Eq, PartialEq)]
198pub enum SqlDdlExecutionStatus {
199    PreparedOnly,
200    Published,
201    NoOp,
202}
203
204impl SqlDdlExecutionStatus {
205    /// Return the stable diagnostic label for this execution status.
206    #[must_use]
207    pub const fn as_str(self) -> &'static str {
208        match self {
209            Self::PreparedOnly => "prepared_only",
210            Self::Published => "published",
211            Self::NoOp => "no_op",
212        }
213    }
214}
215
216///
217/// BoundSqlDdlRequest
218///
219/// Accepted-catalog SQL DDL request after parser syntax has been resolved
220/// against one runtime schema snapshot.
221///
222#[derive(Clone, Debug, Eq, PartialEq)]
223pub(in crate::db) struct BoundSqlDdlRequest {
224    statement: BoundSqlDdlStatement,
225}
226
227impl BoundSqlDdlRequest {
228    /// Borrow the bound statement payload.
229    #[must_use]
230    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
231        &self.statement
232    }
233}
234
235///
236/// BoundSqlDdlStatement
237///
238/// Catalog-resolved DDL statement vocabulary.
239///
240#[derive(Clone, Debug, Eq, PartialEq)]
241pub(in crate::db) enum BoundSqlDdlStatement {
242    AddColumn(BoundSqlAddColumnRequest),
243    CreateIndex(BoundSqlCreateIndexRequest),
244    DropIndex(BoundSqlDropIndexRequest),
245    NoOp(BoundSqlDdlNoOpRequest),
246}
247
248///
249/// BoundSqlAddColumnRequest
250///
251/// Catalog-resolved additive field DDL request.
252///
253#[derive(Clone, Debug, Eq, PartialEq)]
254pub(in crate::db) struct BoundSqlAddColumnRequest {
255    entity_name: String,
256    field: PersistedFieldSnapshot,
257}
258
259impl BoundSqlAddColumnRequest {
260    /// Borrow the accepted entity name.
261    #[must_use]
262    pub(in crate::db) const fn entity_name(&self) -> &str {
263        self.entity_name.as_str()
264    }
265
266    /// Borrow the accepted DDL-owned field snapshot to publish.
267    #[must_use]
268    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
269        &self.field
270    }
271}
272
273///
274/// BoundSqlDdlNoOpRequest
275///
276/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
277///
278#[derive(Clone, Debug, Eq, PartialEq)]
279pub(in crate::db) struct BoundSqlDdlNoOpRequest {
280    mutation_kind: SqlDdlMutationKind,
281    index_name: String,
282    entity_name: String,
283    target_store: String,
284    field_path: Vec<String>,
285}
286
287impl BoundSqlDdlNoOpRequest {
288    /// Return the user-facing mutation family this no-op belongs to.
289    #[must_use]
290    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
291        self.mutation_kind
292    }
293
294    /// Borrow the requested index name.
295    #[must_use]
296    pub(in crate::db) const fn index_name(&self) -> &str {
297        self.index_name.as_str()
298    }
299
300    /// Borrow the accepted entity name that owns this request.
301    #[must_use]
302    pub(in crate::db) const fn entity_name(&self) -> &str {
303        self.entity_name.as_str()
304    }
305
306    /// Borrow the accepted index store path, or `-` when no target exists.
307    #[must_use]
308    pub(in crate::db) const fn target_store(&self) -> &str {
309        self.target_store.as_str()
310    }
311
312    /// Borrow the target field path, empty when no target exists.
313    #[must_use]
314    pub(in crate::db) const fn field_path(&self) -> &[String] {
315        self.field_path.as_slice()
316    }
317}
318
319///
320/// BoundSqlCreateIndexRequest
321///
322/// Catalog-resolved request for adding one secondary index.
323///
324#[derive(Clone, Debug, Eq, PartialEq)]
325pub(in crate::db) struct BoundSqlCreateIndexRequest {
326    index_name: String,
327    entity_name: String,
328    key_items: Vec<BoundSqlDdlCreateIndexKey>,
329    field_paths: Vec<BoundSqlDdlFieldPath>,
330    candidate_index: PersistedIndexSnapshot,
331}
332
333impl BoundSqlCreateIndexRequest {
334    /// Borrow the requested index name.
335    #[must_use]
336    pub(in crate::db) const fn index_name(&self) -> &str {
337        self.index_name.as_str()
338    }
339
340    /// Borrow the accepted entity name that owns this request.
341    #[must_use]
342    pub(in crate::db) const fn entity_name(&self) -> &str {
343        self.entity_name.as_str()
344    }
345
346    /// Borrow the accepted field-path targets.
347    #[must_use]
348    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
349        self.field_paths.as_slice()
350    }
351
352    /// Borrow the accepted key targets in DDL key order.
353    #[must_use]
354    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
355        self.key_items.as_slice()
356    }
357
358    /// Borrow the candidate accepted index snapshot for mutation admission.
359    #[must_use]
360    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
361        &self.candidate_index
362    }
363}
364
365///
366/// BoundSqlDropIndexRequest
367///
368/// Catalog-resolved request for dropping one DDL-published secondary index.
369///
370#[derive(Clone, Debug, Eq, PartialEq)]
371pub(in crate::db) struct BoundSqlDropIndexRequest {
372    index_name: String,
373    entity_name: String,
374    dropped_index: PersistedIndexSnapshot,
375    field_path: Vec<String>,
376}
377
378impl BoundSqlDropIndexRequest {
379    /// Borrow the requested index name.
380    #[must_use]
381    pub(in crate::db) const fn index_name(&self) -> &str {
382        self.index_name.as_str()
383    }
384
385    /// Borrow the accepted entity name that owns this request.
386    #[must_use]
387    pub(in crate::db) const fn entity_name(&self) -> &str {
388        self.entity_name.as_str()
389    }
390
391    /// Borrow the accepted index snapshot that will be removed.
392    #[must_use]
393    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
394        &self.dropped_index
395    }
396
397    /// Borrow the dropped field-path target.
398    #[must_use]
399    pub(in crate::db) const fn field_path(&self) -> &[String] {
400        self.field_path.as_slice()
401    }
402}
403
404///
405/// BoundSqlDdlFieldPath
406///
407/// Accepted field-path target for SQL DDL binding.
408///
409#[derive(Clone, Debug, Eq, PartialEq)]
410pub(in crate::db) struct BoundSqlDdlFieldPath {
411    root: String,
412    segments: Vec<String>,
413    accepted_path: Vec<String>,
414}
415
416impl BoundSqlDdlFieldPath {
417    /// Borrow the top-level field name.
418    #[must_use]
419    pub(in crate::db) const fn root(&self) -> &str {
420        self.root.as_str()
421    }
422
423    /// Borrow nested path segments below the top-level field.
424    #[must_use]
425    pub(in crate::db) const fn segments(&self) -> &[String] {
426        self.segments.as_slice()
427    }
428
429    /// Borrow the full accepted field path used by index metadata.
430    #[must_use]
431    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
432        self.accepted_path.as_slice()
433    }
434}
435
436///
437/// SqlDdlBindError
438///
439/// Typed fail-closed reasons for SQL DDL catalog binding.
440///
441#[derive(Debug, Eq, PartialEq, ThisError)]
442pub(in crate::db) enum SqlDdlBindError {
443    #[error("SQL DDL binder requires a DDL statement")]
444    NotDdl,
445
446    #[error("accepted schema does not expose an entity name")]
447    MissingEntityName,
448
449    #[error("accepted schema does not expose an entity path")]
450    MissingEntityPath,
451
452    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
453    EntityMismatch {
454        sql_entity: String,
455        expected_entity: String,
456    },
457
458    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
459    UnknownFieldPath {
460        entity_name: String,
461        field_path: String,
462    },
463
464    #[error("field path '{field_path}' is not indexable")]
465    FieldPathNotIndexable { field_path: String },
466
467    #[error("field path '{field_path}' depends on generated-only metadata")]
468    FieldPathNotAcceptedCatalogBacked { field_path: String },
469
470    #[error("invalid filtered index predicate: {detail}")]
471    InvalidFilteredIndexPredicate { detail: String },
472
473    #[error("index name '{index_name}' already exists in the accepted schema")]
474    DuplicateIndexName { index_name: String },
475
476    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
477    DuplicateFieldPathIndex {
478        field_path: String,
479        existing_index: String,
480    },
481
482    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
483    UnknownIndex {
484        entity_name: String,
485        index_name: String,
486    },
487
488    #[error(
489        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
490    )]
491    GeneratedIndexDropRejected { index_name: String },
492
493    #[error(
494        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
495    )]
496    UnsupportedDropIndex { index_name: String },
497
498    #[error(
499        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
500    )]
501    UnsupportedAlterTableAddColumn {
502        entity_name: String,
503        column_name: String,
504    },
505
506    #[error(
507        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
508    )]
509    InvalidAlterTableAddColumnDefault {
510        entity_name: String,
511        column_name: String,
512        detail: String,
513    },
514
515    #[error(
516        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
517    )]
518    UnsupportedAlterTableAddColumnNotNull {
519        entity_name: String,
520        column_name: String,
521    },
522
523    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
524    DuplicateColumn {
525        entity_name: String,
526        column_name: String,
527    },
528
529    #[error(
530        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
531    )]
532    UnsupportedAlterTableAddColumnType {
533        entity_name: String,
534        column_name: String,
535        column_type: String,
536    },
537
538    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
539    UnknownColumn {
540        entity_name: String,
541        column_name: String,
542    },
543
544    #[error(
545        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
546    )]
547    UnsupportedAlterTableAlterColumn {
548        entity_name: String,
549        column_name: String,
550        action: String,
551    },
552}
553
554///
555/// SqlDdlLoweringError
556///
557/// Typed fail-closed reasons while lowering bound DDL into schema mutation
558/// admission.
559///
560#[derive(Debug, Eq, PartialEq, ThisError)]
561pub(in crate::db) enum SqlDdlLoweringError {
562    #[error("SQL DDL lowering requires a supported DDL statement")]
563    UnsupportedStatement,
564
565    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
566    MutationAdmission(SchemaDdlMutationAdmissionError),
567}
568
569///
570/// SqlDdlPrepareError
571///
572/// Typed fail-closed preparation errors for SQL DDL.
573///
574#[derive(Debug, Eq, PartialEq, ThisError)]
575pub(in crate::db) enum SqlDdlPrepareError {
576    #[error("{0}")]
577    Bind(#[from] SqlDdlBindError),
578
579    #[error("{0}")]
580    Lowering(#[from] SqlDdlLoweringError),
581}
582
583/// Prepare one parsed SQL DDL statement through every pre-execution proof.
584pub(in crate::db) fn prepare_sql_ddl_statement(
585    statement: &SqlStatement,
586    accepted_before: &AcceptedSchemaSnapshot,
587    schema: &SchemaInfo,
588    index_store_path: &'static str,
589) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
590    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
591    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
592        None
593    } else {
594        Some(derive_bound_sql_ddl_accepted_after(
595            accepted_before,
596            &bound,
597        )?)
598    };
599    let report = ddl_preparation_report(&bound);
600
601    Ok(PreparedSqlDdlCommand {
602        bound,
603        derivation,
604        report,
605    })
606}
607
608/// Bind one parsed SQL DDL statement against accepted catalog metadata.
609pub(in crate::db) fn bind_sql_ddl_statement(
610    statement: &SqlStatement,
611    accepted_before: &AcceptedSchemaSnapshot,
612    schema: &SchemaInfo,
613    index_store_path: &'static str,
614) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
615    let SqlStatement::Ddl(ddl) = statement else {
616        return Err(SqlDdlBindError::NotDdl);
617    };
618
619    match ddl {
620        SqlDdlStatement::CreateIndex(statement) => {
621            bind_create_index_statement(statement, schema, index_store_path)
622        }
623        SqlDdlStatement::DropIndex(statement) => {
624            bind_drop_index_statement(statement, accepted_before, schema)
625        }
626        SqlDdlStatement::AlterTableAddColumn(statement) => {
627            bind_alter_table_add_column_statement(statement, accepted_before, schema)
628        }
629        SqlDdlStatement::AlterTableAlterColumn(statement) => {
630            Err(alter_table_alter_column_bind_error(statement, schema))
631        }
632    }
633}
634
635fn bind_create_index_statement(
636    statement: &SqlCreateIndexStatement,
637    schema: &SchemaInfo,
638    index_store_path: &'static str,
639) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
640    let entity_name = schema
641        .entity_name()
642        .ok_or(SqlDdlBindError::MissingEntityName)?;
643
644    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
645        return Err(SqlDdlBindError::EntityMismatch {
646            sql_entity: statement.entity.clone(),
647            expected_entity: entity_name.to_string(),
648        });
649    }
650
651    let key_items = statement
652        .key_items
653        .iter()
654        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
655        .collect::<Result<Vec<_>, _>>()?;
656    let field_paths = create_index_field_path_report_items(key_items.as_slice());
657    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
658        if key_items_are_field_path_only(key_items.as_slice())
659            && statement.if_not_exists
660            && existing_field_path_index_matches_request(
661                existing_index,
662                field_paths.as_slice(),
663                statement.predicate_sql.as_deref(),
664                statement.uniqueness,
665            )
666        {
667            return Ok(BoundSqlDdlRequest {
668                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
669                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
670                    index_name: statement.name.clone(),
671                    entity_name: entity_name.to_string(),
672                    target_store: existing_index.store().to_string(),
673                    field_path: ddl_field_path_report(field_paths.as_slice()),
674                }),
675            });
676        }
677
678        return Err(SqlDdlBindError::DuplicateIndexName {
679            index_name: statement.name.clone(),
680        });
681    }
682    let predicate_sql =
683        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
684    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
685        if statement.if_not_exists
686            && existing_expression_index_matches_request(
687                existing_index,
688                key_items.as_slice(),
689                predicate_sql.as_deref(),
690                statement.uniqueness,
691            )
692        {
693            return Ok(BoundSqlDdlRequest {
694                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
695                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
696                    index_name: statement.name.clone(),
697                    entity_name: entity_name.to_string(),
698                    target_store: existing_index.store().to_string(),
699                    field_path: ddl_key_item_report(key_items.as_slice()),
700                }),
701            });
702        }
703
704        return Err(SqlDdlBindError::DuplicateIndexName {
705            index_name: statement.name.clone(),
706        });
707    }
708    if key_items_are_field_path_only(key_items.as_slice()) {
709        reject_duplicate_field_path_index(
710            field_paths.as_slice(),
711            predicate_sql.as_deref(),
712            schema,
713        )?;
714    } else {
715        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
716    }
717    let candidate_index = candidate_index_snapshot(
718        statement.name.as_str(),
719        key_items.as_slice(),
720        predicate_sql.as_deref(),
721        statement.uniqueness,
722        schema,
723        index_store_path,
724    )?;
725
726    Ok(BoundSqlDdlRequest {
727        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
728            index_name: statement.name.clone(),
729            entity_name: entity_name.to_string(),
730            key_items,
731            field_paths,
732            candidate_index,
733        }),
734    })
735}
736
737fn bind_drop_index_statement(
738    statement: &SqlDropIndexStatement,
739    accepted_before: &AcceptedSchemaSnapshot,
740    schema: &SchemaInfo,
741) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
742    let entity_name = schema
743        .entity_name()
744        .ok_or(SqlDdlBindError::MissingEntityName)?;
745
746    if let Some(sql_entity) = statement.entity.as_deref()
747        && !identifiers_tail_match(sql_entity, entity_name)
748    {
749        return Err(SqlDdlBindError::EntityMismatch {
750            sql_entity: sql_entity.to_string(),
751            expected_entity: entity_name.to_string(),
752        });
753    }
754    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
755        accepted_before,
756        &statement.name,
757    )
758    .map_err(|error| match error {
759        SchemaDdlIndexDropCandidateError::Generated => {
760            SqlDdlBindError::GeneratedIndexDropRejected {
761                index_name: statement.name.clone(),
762            }
763        }
764        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
765            entity_name: entity_name.to_string(),
766            index_name: statement.name.clone(),
767        },
768        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
769            index_name: statement.name.clone(),
770        },
771    });
772    let (dropped_index, field_path) = match drop_candidate {
773        Ok((dropped_index, field_path)) => (dropped_index, field_path),
774        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
775            return Ok(BoundSqlDdlRequest {
776                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
777                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
778                    index_name: statement.name.clone(),
779                    entity_name: entity_name.to_string(),
780                    target_store: "-".to_string(),
781                    field_path: Vec::new(),
782                }),
783            });
784        }
785        Err(error) => return Err(error),
786    };
787    Ok(BoundSqlDdlRequest {
788        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
789            index_name: statement.name.clone(),
790            entity_name: entity_name.to_string(),
791            dropped_index,
792            field_path,
793        }),
794    })
795}
796
797fn bind_alter_table_add_column_statement(
798    statement: &SqlAlterTableAddColumnStatement,
799    accepted_before: &AcceptedSchemaSnapshot,
800    schema: &SchemaInfo,
801) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
802    let entity_name = schema
803        .entity_name()
804        .ok_or(SqlDdlBindError::MissingEntityName)?;
805
806    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
807        return Err(SqlDdlBindError::EntityMismatch {
808            sql_entity: statement.entity.clone(),
809            expected_entity: entity_name.to_string(),
810        });
811    }
812
813    if schema
814        .field_nullable(statement.column_name.as_str())
815        .is_some()
816    {
817        return Err(SqlDdlBindError::DuplicateColumn {
818            entity_name: entity_name.to_string(),
819            column_name: statement.column_name.clone(),
820        });
821    }
822
823    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
824        statement.column_type.as_str(),
825    )
826    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
827        entity_name: entity_name.to_string(),
828        column_name: statement.column_name.clone(),
829        column_type: statement.column_type.clone(),
830    })?;
831    let default = schema_field_default_for_sql_default(
832        entity_name,
833        statement.column_name.as_str(),
834        statement.default.as_ref(),
835        &kind,
836        statement.nullable,
837        storage_decode,
838        leaf_codec,
839    )?;
840    if !statement.nullable && default.is_none() {
841        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
842            entity_name: entity_name.to_string(),
843            column_name: statement.column_name.clone(),
844        });
845    }
846    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
847        next_sql_ddl_field_id(accepted_before),
848        statement.column_name.clone(),
849        next_sql_ddl_field_slot(accepted_before),
850        kind,
851        Vec::new(),
852        statement.nullable,
853        default,
854        SchemaFieldWritePolicy::from_model_policies(None, None),
855        PersistedFieldOrigin::SqlDdl,
856        storage_decode,
857        leaf_codec,
858    );
859
860    Ok(BoundSqlDdlRequest {
861        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
862            entity_name: entity_name.to_string(),
863            field,
864        }),
865    })
866}
867
868fn alter_table_alter_column_bind_error(
869    statement: &SqlAlterTableAlterColumnStatement,
870    schema: &SchemaInfo,
871) -> SqlDdlBindError {
872    let Some(entity_name) = schema.entity_name() else {
873        return SqlDdlBindError::MissingEntityName;
874    };
875
876    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
877        return SqlDdlBindError::EntityMismatch {
878            sql_entity: statement.entity.clone(),
879            expected_entity: entity_name.to_string(),
880        };
881    }
882
883    if schema
884        .field_nullable(statement.column_name.as_str())
885        .is_none()
886    {
887        return SqlDdlBindError::UnknownColumn {
888            entity_name: entity_name.to_string(),
889            column_name: statement.column_name.clone(),
890        };
891    }
892
893    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
894        entity_name: entity_name.to_string(),
895        column_name: statement.column_name.clone(),
896        action: statement.action.label().to_string(),
897    }
898}
899
900fn schema_field_default_for_sql_default(
901    entity_name: &str,
902    column_name: &str,
903    default: Option<&crate::value::Value>,
904    kind: &PersistedFieldKind,
905    nullable: bool,
906    storage_decode: FieldStorageDecode,
907    leaf_codec: LeafCodec,
908) -> Result<SchemaFieldDefault, SqlDdlBindError> {
909    let Some(default) = default else {
910        return Ok(SchemaFieldDefault::None);
911    };
912    if matches!(default, crate::value::Value::Null) {
913        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
914            entity_name: entity_name.to_string(),
915            column_name: column_name.to_string(),
916            detail: "NULL cannot be used as an accepted database default".to_string(),
917        });
918    }
919
920    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
921        .unwrap_or_else(|| default.clone());
922    let contract =
923        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
924    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
925        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
926            entity_name: entity_name.to_string(),
927            column_name: column_name.to_string(),
928            detail: error.to_string(),
929        },
930    )?;
931
932    Ok(SchemaFieldDefault::SlotPayload(payload))
933}
934
935fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
936    let next = accepted_before
937        .persisted_snapshot()
938        .fields()
939        .iter()
940        .map(|field| field.id().get())
941        .max()
942        .unwrap_or(0)
943        .checked_add(1)
944        .expect("accepted field IDs should not be exhausted");
945
946    FieldId::new(next)
947}
948
949fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
950    let next = accepted_before
951        .persisted_snapshot()
952        .row_layout()
953        .field_to_slot()
954        .iter()
955        .map(|(_, slot)| slot.get())
956        .max()
957        .unwrap_or(0)
958        .checked_add(1)
959        .expect("accepted row slots should not be exhausted");
960
961    SchemaFieldSlot::new(next)
962}
963
964fn persisted_field_contract_for_sql_column_type(
965    column_type: &str,
966) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
967    let normalized = column_type.trim().to_ascii_lowercase();
968    match normalized.as_str() {
969        "bool" | "boolean" => Some((
970            PersistedFieldKind::Bool,
971            FieldStorageDecode::ByKind,
972            LeafCodec::Scalar(ScalarCodec::Bool),
973        )),
974        "int" | "integer" => Some((
975            PersistedFieldKind::Int,
976            FieldStorageDecode::ByKind,
977            LeafCodec::Scalar(ScalarCodec::Int64),
978        )),
979        "nat" | "natural" => Some((
980            PersistedFieldKind::Nat,
981            FieldStorageDecode::ByKind,
982            LeafCodec::Scalar(ScalarCodec::Nat64),
983        )),
984        "text" | "string" => Some((
985            PersistedFieldKind::Text { max_len: None },
986            FieldStorageDecode::ByKind,
987            LeafCodec::Scalar(ScalarCodec::Text),
988        )),
989        _ => None,
990    }
991}
992
993#[derive(Clone, Debug, Eq, PartialEq)]
994pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
995    FieldPath(BoundSqlDdlFieldPath),
996    Expression(BoundSqlDdlExpressionKey),
997}
998
999///
1000/// BoundSqlDdlExpressionKey
1001///
1002/// Accepted expression-index key target for SQL DDL binding.
1003///
1004#[derive(Clone, Debug, Eq, PartialEq)]
1005pub(in crate::db) struct BoundSqlDdlExpressionKey {
1006    op: PersistedIndexExpressionOp,
1007    source: BoundSqlDdlFieldPath,
1008    canonical_sql: String,
1009}
1010
1011impl BoundSqlDdlExpressionKey {
1012    /// Return the accepted expression operation.
1013    #[must_use]
1014    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1015        self.op
1016    }
1017
1018    /// Borrow the accepted source field path.
1019    #[must_use]
1020    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1021        &self.source
1022    }
1023
1024    /// Borrow the SQL-facing canonical expression text.
1025    #[must_use]
1026    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1027        self.canonical_sql.as_str()
1028    }
1029}
1030
1031fn bind_create_index_key_item(
1032    key_item: &SqlCreateIndexKeyItem,
1033    entity_name: &str,
1034    schema: &SchemaInfo,
1035) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1036    match key_item {
1037        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1038            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1039                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1040        }
1041        SqlCreateIndexKeyItem::Expression(expression) => {
1042            bind_create_index_expression_key(expression, entity_name, schema)
1043        }
1044    }
1045}
1046
1047fn bind_create_index_expression_key(
1048    expression: &SqlCreateIndexExpressionKey,
1049    entity_name: &str,
1050    schema: &SchemaInfo,
1051) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1052    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1053
1054    Ok(BoundSqlDdlCreateIndexKey::Expression(
1055        BoundSqlDdlExpressionKey {
1056            op: expression_op_from_sql_function(expression.function),
1057            source,
1058            canonical_sql: expression.canonical_sql(),
1059        },
1060    ))
1061}
1062
1063const fn expression_op_from_sql_function(
1064    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1065) -> PersistedIndexExpressionOp {
1066    match function {
1067        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1068            PersistedIndexExpressionOp::Lower
1069        }
1070        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1071            PersistedIndexExpressionOp::Upper
1072        }
1073        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1074            PersistedIndexExpressionOp::Trim
1075        }
1076    }
1077}
1078
1079fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1080    key_items
1081        .iter()
1082        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1083}
1084
1085fn create_index_field_path_report_items(
1086    key_items: &[BoundSqlDdlCreateIndexKey],
1087) -> Vec<BoundSqlDdlFieldPath> {
1088    key_items
1089        .iter()
1090        .map(|key_item| match key_item {
1091            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1092            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1093        })
1094        .collect()
1095}
1096
1097fn bind_create_index_field_path(
1098    field_path: &str,
1099    entity_name: &str,
1100    schema: &SchemaInfo,
1101) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1102    let mut path = field_path
1103        .split('.')
1104        .map(str::trim)
1105        .filter(|segment| !segment.is_empty());
1106    let Some(root) = path.next() else {
1107        return Err(SqlDdlBindError::UnknownFieldPath {
1108            entity_name: entity_name.to_string(),
1109            field_path: field_path.to_string(),
1110        });
1111    };
1112    let segments = path.map(str::to_string).collect::<Vec<_>>();
1113
1114    let capabilities = if segments.is_empty() {
1115        schema.sql_capabilities(root)
1116    } else {
1117        schema.nested_sql_capabilities(root, segments.as_slice())
1118    }
1119    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1120        entity_name: entity_name.to_string(),
1121        field_path: field_path.to_string(),
1122    })?;
1123
1124    if !capabilities.orderable() {
1125        return Err(SqlDdlBindError::FieldPathNotIndexable {
1126            field_path: field_path.to_string(),
1127        });
1128    }
1129
1130    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1131    accepted_path.push(root.to_string());
1132    accepted_path.extend(segments.iter().cloned());
1133
1134    Ok(BoundSqlDdlFieldPath {
1135        root: root.to_string(),
1136        segments,
1137        accepted_path,
1138    })
1139}
1140
1141fn find_field_path_index_by_name<'a>(
1142    schema: &'a SchemaInfo,
1143    index_name: &str,
1144) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1145    schema
1146        .field_path_indexes()
1147        .iter()
1148        .find(|index| index.name() == index_name)
1149}
1150
1151fn existing_field_path_index_matches_request(
1152    index: &crate::db::schema::SchemaIndexInfo,
1153    field_paths: &[BoundSqlDdlFieldPath],
1154    predicate_sql: Option<&str>,
1155    uniqueness: SqlCreateIndexUniqueness,
1156) -> bool {
1157    let fields = index.fields();
1158
1159    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1160        && index.predicate_sql() == predicate_sql
1161        && fields.len() == field_paths.len()
1162        && fields
1163            .iter()
1164            .zip(field_paths)
1165            .all(|(field, requested)| field.path() == requested.accepted_path())
1166}
1167
1168fn find_expression_index_by_name<'a>(
1169    schema: &'a SchemaInfo,
1170    index_name: &str,
1171) -> Option<&'a SchemaExpressionIndexInfo> {
1172    schema
1173        .expression_indexes()
1174        .iter()
1175        .find(|index| index.name() == index_name)
1176}
1177
1178fn existing_expression_index_matches_request(
1179    index: &SchemaExpressionIndexInfo,
1180    key_items: &[BoundSqlDdlCreateIndexKey],
1181    predicate_sql: Option<&str>,
1182    uniqueness: SqlCreateIndexUniqueness,
1183) -> bool {
1184    let existing_key_items = index.key_items();
1185
1186    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1187        && index.predicate_sql() == predicate_sql
1188        && existing_key_items.len() == key_items.len()
1189        && existing_key_items
1190            .iter()
1191            .zip(key_items)
1192            .all(existing_expression_key_item_matches_request)
1193}
1194
1195fn existing_expression_key_item_matches_request(
1196    existing: (
1197        &SchemaExpressionIndexKeyItemInfo,
1198        &BoundSqlDdlCreateIndexKey,
1199    ),
1200) -> bool {
1201    let (existing, requested) = existing;
1202    match (existing, requested) {
1203        (
1204            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1205            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1206        ) => existing.path() == requested.accepted_path(),
1207        (
1208            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1209            BoundSqlDdlCreateIndexKey::Expression(requested),
1210        ) => existing_expression_component_matches_request(
1211            existing.op(),
1212            existing.source().path(),
1213            existing.canonical_text(),
1214            requested,
1215        ),
1216        _ => false,
1217    }
1218}
1219
1220fn existing_expression_component_matches_request(
1221    existing_op: PersistedIndexExpressionOp,
1222    existing_path: &[String],
1223    existing_canonical_text: &str,
1224    requested: &BoundSqlDdlExpressionKey,
1225) -> bool {
1226    let requested_path = requested.source().accepted_path();
1227    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1228
1229    existing_op == requested.op()
1230        && existing_path == requested_path
1231        && existing_canonical_text == requested_canonical_text
1232}
1233
1234fn reject_duplicate_expression_index(
1235    key_items: &[BoundSqlDdlCreateIndexKey],
1236    predicate_sql: Option<&str>,
1237    schema: &SchemaInfo,
1238) -> Result<(), SqlDdlBindError> {
1239    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1240        existing_expression_index_matches_request(
1241            index,
1242            key_items,
1243            predicate_sql,
1244            if index.unique() {
1245                SqlCreateIndexUniqueness::Unique
1246            } else {
1247                SqlCreateIndexUniqueness::NonUnique
1248            },
1249        )
1250    }) else {
1251        return Ok(());
1252    };
1253
1254    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1255        field_path: ddl_key_item_report(key_items).join(","),
1256        existing_index: existing_index.name().to_string(),
1257    })
1258}
1259
1260fn reject_duplicate_field_path_index(
1261    field_paths: &[BoundSqlDdlFieldPath],
1262    predicate_sql: Option<&str>,
1263    schema: &SchemaInfo,
1264) -> Result<(), SqlDdlBindError> {
1265    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1266        let fields = index.fields();
1267        index.predicate_sql() == predicate_sql
1268            && fields.len() == field_paths.len()
1269            && fields
1270                .iter()
1271                .zip(field_paths)
1272                .all(|(field, requested)| field.path() == requested.accepted_path())
1273    }) else {
1274        return Ok(());
1275    };
1276
1277    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1278        field_path: ddl_field_path_report(field_paths).join(","),
1279        existing_index: existing_index.name().to_string(),
1280    })
1281}
1282
1283fn candidate_index_snapshot(
1284    index_name: &str,
1285    key_items: &[BoundSqlDdlCreateIndexKey],
1286    predicate_sql: Option<&str>,
1287    uniqueness: SqlCreateIndexUniqueness,
1288    schema: &SchemaInfo,
1289    index_store_path: &'static str,
1290) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1291    let key = if key_items_are_field_path_only(key_items) {
1292        PersistedIndexKeySnapshot::FieldPath(
1293            key_items
1294                .iter()
1295                .map(|key_item| {
1296                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1297                        unreachable!("field-path-only index checked before field-path lowering");
1298                    };
1299
1300                    accepted_index_field_path_snapshot(schema, field_path)
1301                })
1302                .collect::<Result<Vec<_>, _>>()?,
1303        )
1304    } else {
1305        PersistedIndexKeySnapshot::Items(
1306            key_items
1307                .iter()
1308                .map(|key_item| match key_item {
1309                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1310                        accepted_index_field_path_snapshot(schema, field_path)
1311                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1312                    }
1313                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1314                        accepted_index_expression_snapshot(schema, expression)
1315                    }
1316                })
1317                .collect::<Result<Vec<_>, _>>()?,
1318        )
1319    };
1320
1321    Ok(PersistedIndexSnapshot::new_sql_ddl(
1322        schema.next_secondary_index_ordinal(),
1323        index_name.to_string(),
1324        index_store_path.to_string(),
1325        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1326        key,
1327        predicate_sql.map(str::to_string),
1328    ))
1329}
1330
1331fn accepted_index_field_path_snapshot(
1332    schema: &SchemaInfo,
1333    field_path: &BoundSqlDdlFieldPath,
1334) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1335    schema
1336        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1337        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1338            field_path: field_path.accepted_path().join("."),
1339        })
1340}
1341
1342fn accepted_index_expression_snapshot(
1343    schema: &SchemaInfo,
1344    expression: &BoundSqlDdlExpressionKey,
1345) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1346    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1347    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1348        return Err(SqlDdlBindError::FieldPathNotIndexable {
1349            field_path: expression.source().accepted_path().join("."),
1350        });
1351    };
1352
1353    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1354        PersistedIndexExpressionSnapshot::new(
1355            expression.op(),
1356            source.clone(),
1357            source.kind().clone(),
1358            output_kind,
1359            format!("expr:v1:{}", expression.canonical_sql()),
1360        ),
1361    )))
1362}
1363
1364fn expression_output_kind(
1365    op: PersistedIndexExpressionOp,
1366    source_kind: &PersistedFieldKind,
1367) -> Option<PersistedFieldKind> {
1368    match op {
1369        PersistedIndexExpressionOp::Lower
1370        | PersistedIndexExpressionOp::Upper
1371        | PersistedIndexExpressionOp::Trim
1372        | PersistedIndexExpressionOp::LowerTrim => {
1373            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1374                Some(source_kind.clone())
1375            } else {
1376                None
1377            }
1378        }
1379        PersistedIndexExpressionOp::Date => {
1380            if matches!(
1381                source_kind,
1382                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1383            ) {
1384                Some(PersistedFieldKind::Date)
1385            } else {
1386                None
1387            }
1388        }
1389        PersistedIndexExpressionOp::Year
1390        | PersistedIndexExpressionOp::Month
1391        | PersistedIndexExpressionOp::Day => {
1392            if matches!(
1393                source_kind,
1394                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1395            ) {
1396                Some(PersistedFieldKind::Int)
1397            } else {
1398                None
1399            }
1400        }
1401    }
1402}
1403
1404fn validated_create_index_predicate_sql(
1405    predicate_sql: Option<&str>,
1406    schema: &SchemaInfo,
1407) -> Result<Option<String>, SqlDdlBindError> {
1408    let Some(predicate_sql) = predicate_sql else {
1409        return Ok(None);
1410    };
1411    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1412        SqlDdlBindError::InvalidFilteredIndexPredicate {
1413            detail: error.to_string(),
1414        }
1415    })?;
1416    validate_predicate(schema, &predicate).map_err(|error| {
1417        SqlDdlBindError::InvalidFilteredIndexPredicate {
1418            detail: error.to_string(),
1419        }
1420    })?;
1421
1422    Ok(Some(predicate_sql.to_string()))
1423}
1424
1425fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1426    match field_paths {
1427        [field_path] => field_path.accepted_path().to_vec(),
1428        _ => vec![
1429            field_paths
1430                .iter()
1431                .map(|field_path| field_path.accepted_path().join("."))
1432                .collect::<Vec<_>>()
1433                .join(","),
1434        ],
1435    }
1436}
1437
1438fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1439    match key_items {
1440        [key_item] => vec![ddl_key_item_text(key_item)],
1441        _ => vec![
1442            key_items
1443                .iter()
1444                .map(ddl_key_item_text)
1445                .collect::<Vec<_>>()
1446                .join(","),
1447        ],
1448    }
1449}
1450
1451fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1452    match key_item {
1453        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1454        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1455    }
1456}
1457
1458/// Lower one bound SQL DDL request through schema mutation admission.
1459pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1460    request: &BoundSqlDdlRequest,
1461) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1462    match request.statement() {
1463        BoundSqlDdlStatement::AddColumn(add) => {
1464            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
1465        }
1466        BoundSqlDdlStatement::CreateIndex(create) => {
1467            if create.candidate_index().key().is_field_path_only() {
1468                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1469            } else {
1470                admit_sql_ddl_expression_index_candidate(create.candidate_index())
1471            }
1472        }
1473        BoundSqlDdlStatement::DropIndex(drop) => {
1474            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
1475        }
1476        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1477    }
1478    .map_err(SqlDdlLoweringError::MutationAdmission)
1479}
1480
1481/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
1482pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
1483    accepted_before: &AcceptedSchemaSnapshot,
1484    request: &BoundSqlDdlRequest,
1485) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
1486    match request.statement() {
1487        BoundSqlDdlStatement::AddColumn(add) => {
1488            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
1489        }
1490        BoundSqlDdlStatement::CreateIndex(create) => {
1491            if create.candidate_index().key().is_field_path_only() {
1492                derive_sql_ddl_field_path_index_accepted_after(
1493                    accepted_before,
1494                    create.candidate_index().clone(),
1495                )
1496            } else {
1497                derive_sql_ddl_expression_index_accepted_after(
1498                    accepted_before,
1499                    create.candidate_index().clone(),
1500                )
1501            }
1502        }
1503        BoundSqlDdlStatement::DropIndex(drop) => {
1504            derive_sql_ddl_secondary_index_drop_accepted_after(
1505                accepted_before,
1506                drop.dropped_index(),
1507            )
1508        }
1509        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1510    }
1511    .map_err(SqlDdlLoweringError::MutationAdmission)
1512}
1513
1514fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
1515    match bound.statement() {
1516        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
1517            mutation_kind: if add.field().default().is_none() {
1518                SqlDdlMutationKind::AddNullableField
1519            } else {
1520                SqlDdlMutationKind::AddDefaultedField
1521            },
1522            target_index: add.field().name().to_string(),
1523            target_store: add.entity_name().to_string(),
1524            field_path: vec![add.field().name().to_string()],
1525            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1526            rows_scanned: 0,
1527            index_keys_written: 0,
1528        },
1529        BoundSqlDdlStatement::CreateIndex(create) => {
1530            let target = create.candidate_index();
1531
1532            SqlDdlPreparationReport {
1533                mutation_kind: if target.key().is_field_path_only() {
1534                    SqlDdlMutationKind::AddFieldPathIndex
1535                } else {
1536                    SqlDdlMutationKind::AddExpressionIndex
1537                },
1538                target_index: target.name().to_string(),
1539                target_store: target.store().to_string(),
1540                field_path: ddl_key_item_report(create.key_items()),
1541                execution_status: SqlDdlExecutionStatus::PreparedOnly,
1542                rows_scanned: 0,
1543                index_keys_written: 0,
1544            }
1545        }
1546        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
1547            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1548            target_index: drop.index_name().to_string(),
1549            target_store: drop.dropped_index().store().to_string(),
1550            field_path: drop.field_path().to_vec(),
1551            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1552            rows_scanned: 0,
1553            index_keys_written: 0,
1554        },
1555        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
1556            mutation_kind: no_op.mutation_kind(),
1557            target_index: no_op.index_name().to_string(),
1558            target_store: no_op.target_store().to_string(),
1559            field_path: no_op.field_path().to_vec(),
1560            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1561            rows_scanned: 0,
1562            index_keys_written: 0,
1563        },
1564    }
1565}