Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_path_index_candidate,
25        admit_sql_ddl_secondary_index_drop_candidate,
26        canonicalize_strict_sql_literal_for_persisted_kind,
27        derive_sql_ddl_expression_index_accepted_after,
28        derive_sql_ddl_field_addition_accepted_after,
29        derive_sql_ddl_field_path_index_accepted_after,
30        derive_sql_ddl_secondary_index_drop_accepted_after,
31        resolve_sql_ddl_secondary_index_drop_candidate,
32    },
33    sql::{
34        identifier::identifiers_tail_match,
35        parser::{
36            SqlAlterTableAddColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
37            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
38            SqlDropIndexStatement, SqlStatement,
39        },
40    },
41};
42use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
43use thiserror::Error as ThisError;
44
45///
46/// PreparedSqlDdlCommand
47///
48/// Fully prepared SQL DDL command. This is intentionally not executable yet:
49/// it packages the accepted-catalog binding, accepted-after derivation, and
50/// schema mutation admission proof for the future execution boundary.
51///
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub(in crate::db) struct PreparedSqlDdlCommand {
54    bound: BoundSqlDdlRequest,
55    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
56    report: SqlDdlPreparationReport,
57}
58
59impl PreparedSqlDdlCommand {
60    /// Borrow the accepted-catalog-bound DDL request.
61    #[must_use]
62    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
63        &self.bound
64    }
65
66    /// Borrow the accepted-after derivation proof.
67    #[must_use]
68    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
69        self.derivation.as_ref()
70    }
71
72    /// Borrow the developer-facing preparation report.
73    #[must_use]
74    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
75        &self.report
76    }
77
78    /// Return whether this prepared command needs schema or storage mutation.
79    #[must_use]
80    pub(in crate::db) const fn mutates_schema(&self) -> bool {
81        self.derivation.is_some()
82    }
83}
84
85///
86/// SqlDdlPreparationReport
87///
88/// Compact report for a DDL command that has passed all pre-execution
89/// frontend and schema-mutation checks.
90///
91#[derive(Clone, Debug, Eq, PartialEq)]
92pub struct SqlDdlPreparationReport {
93    mutation_kind: SqlDdlMutationKind,
94    target_index: String,
95    target_store: String,
96    field_path: Vec<String>,
97    execution_status: SqlDdlExecutionStatus,
98    rows_scanned: usize,
99    index_keys_written: usize,
100}
101
102impl SqlDdlPreparationReport {
103    /// Return the prepared DDL mutation kind.
104    #[must_use]
105    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
106        self.mutation_kind
107    }
108
109    /// Borrow the target accepted index name.
110    #[must_use]
111    pub const fn target_index(&self) -> &str {
112        self.target_index.as_str()
113    }
114
115    /// Borrow the target accepted index store path.
116    #[must_use]
117    pub const fn target_store(&self) -> &str {
118        self.target_store.as_str()
119    }
120
121    /// Borrow the target field path.
122    #[must_use]
123    pub const fn field_path(&self) -> &[String] {
124        self.field_path.as_slice()
125    }
126
127    /// Return the execution status captured by this DDL report.
128    #[must_use]
129    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
130        self.execution_status
131    }
132
133    /// Return rows scanned by DDL execution.
134    #[must_use]
135    pub const fn rows_scanned(&self) -> usize {
136        self.rows_scanned
137    }
138
139    /// Return index keys written by DDL execution.
140    #[must_use]
141    pub const fn index_keys_written(&self) -> usize {
142        self.index_keys_written
143    }
144
145    pub(in crate::db) const fn with_execution_status(
146        mut self,
147        execution_status: SqlDdlExecutionStatus,
148    ) -> Self {
149        self.execution_status = execution_status;
150        self
151    }
152
153    pub(in crate::db) const fn with_execution_metrics(
154        mut self,
155        rows_scanned: usize,
156        index_keys_written: usize,
157    ) -> Self {
158        self.rows_scanned = rows_scanned;
159        self.index_keys_written = index_keys_written;
160        self
161    }
162}
163
164///
165/// SqlDdlMutationKind
166///
167/// Developer-facing SQL DDL mutation kind.
168///
169#[derive(Clone, Copy, Debug, Eq, PartialEq)]
170pub enum SqlDdlMutationKind {
171    AddDefaultedField,
172    AddNullableField,
173    AddFieldPathIndex,
174    AddExpressionIndex,
175    DropSecondaryIndex,
176}
177
178impl SqlDdlMutationKind {
179    /// Return the stable diagnostic label for this DDL mutation kind.
180    #[must_use]
181    pub const fn as_str(self) -> &'static str {
182        match self {
183            Self::AddDefaultedField => "add_defaulted_field",
184            Self::AddNullableField => "add_nullable_field",
185            Self::AddFieldPathIndex => "add_field_path_index",
186            Self::AddExpressionIndex => "add_expression_index",
187            Self::DropSecondaryIndex => "drop_secondary_index",
188        }
189    }
190}
191
192///
193/// SqlDdlExecutionStatus
194///
195/// SQL DDL execution state at the current boundary.
196///
197#[derive(Clone, Copy, Debug, Eq, PartialEq)]
198pub enum SqlDdlExecutionStatus {
199    PreparedOnly,
200    Published,
201    NoOp,
202}
203
204impl SqlDdlExecutionStatus {
205    /// Return the stable diagnostic label for this execution status.
206    #[must_use]
207    pub const fn as_str(self) -> &'static str {
208        match self {
209            Self::PreparedOnly => "prepared_only",
210            Self::Published => "published",
211            Self::NoOp => "no_op",
212        }
213    }
214}
215
216///
217/// BoundSqlDdlRequest
218///
219/// Accepted-catalog SQL DDL request after parser syntax has been resolved
220/// against one runtime schema snapshot.
221///
222#[derive(Clone, Debug, Eq, PartialEq)]
223pub(in crate::db) struct BoundSqlDdlRequest {
224    statement: BoundSqlDdlStatement,
225}
226
227impl BoundSqlDdlRequest {
228    /// Borrow the bound statement payload.
229    #[must_use]
230    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
231        &self.statement
232    }
233}
234
235///
236/// BoundSqlDdlStatement
237///
238/// Catalog-resolved DDL statement vocabulary.
239///
240#[derive(Clone, Debug, Eq, PartialEq)]
241pub(in crate::db) enum BoundSqlDdlStatement {
242    AddColumn(BoundSqlAddColumnRequest),
243    CreateIndex(BoundSqlCreateIndexRequest),
244    DropIndex(BoundSqlDropIndexRequest),
245    NoOp(BoundSqlDdlNoOpRequest),
246}
247
248///
249/// BoundSqlAddColumnRequest
250///
251/// Catalog-resolved additive field DDL request.
252///
253#[derive(Clone, Debug, Eq, PartialEq)]
254pub(in crate::db) struct BoundSqlAddColumnRequest {
255    entity_name: String,
256    field: PersistedFieldSnapshot,
257}
258
259impl BoundSqlAddColumnRequest {
260    /// Borrow the accepted entity name.
261    #[must_use]
262    pub(in crate::db) const fn entity_name(&self) -> &str {
263        self.entity_name.as_str()
264    }
265
266    /// Borrow the accepted DDL-owned field snapshot to publish.
267    #[must_use]
268    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
269        &self.field
270    }
271}
272
273///
274/// BoundSqlDdlNoOpRequest
275///
276/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
277///
278#[derive(Clone, Debug, Eq, PartialEq)]
279pub(in crate::db) struct BoundSqlDdlNoOpRequest {
280    mutation_kind: SqlDdlMutationKind,
281    index_name: String,
282    entity_name: String,
283    target_store: String,
284    field_path: Vec<String>,
285}
286
287impl BoundSqlDdlNoOpRequest {
288    /// Return the user-facing mutation family this no-op belongs to.
289    #[must_use]
290    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
291        self.mutation_kind
292    }
293
294    /// Borrow the requested index name.
295    #[must_use]
296    pub(in crate::db) const fn index_name(&self) -> &str {
297        self.index_name.as_str()
298    }
299
300    /// Borrow the accepted entity name that owns this request.
301    #[must_use]
302    pub(in crate::db) const fn entity_name(&self) -> &str {
303        self.entity_name.as_str()
304    }
305
306    /// Borrow the accepted index store path, or `-` when no target exists.
307    #[must_use]
308    pub(in crate::db) const fn target_store(&self) -> &str {
309        self.target_store.as_str()
310    }
311
312    /// Borrow the target field path, empty when no target exists.
313    #[must_use]
314    pub(in crate::db) const fn field_path(&self) -> &[String] {
315        self.field_path.as_slice()
316    }
317}
318
319///
320/// BoundSqlCreateIndexRequest
321///
322/// Catalog-resolved request for adding one secondary index.
323///
324#[derive(Clone, Debug, Eq, PartialEq)]
325pub(in crate::db) struct BoundSqlCreateIndexRequest {
326    index_name: String,
327    entity_name: String,
328    key_items: Vec<BoundSqlDdlCreateIndexKey>,
329    field_paths: Vec<BoundSqlDdlFieldPath>,
330    candidate_index: PersistedIndexSnapshot,
331}
332
333impl BoundSqlCreateIndexRequest {
334    /// Borrow the requested index name.
335    #[must_use]
336    pub(in crate::db) const fn index_name(&self) -> &str {
337        self.index_name.as_str()
338    }
339
340    /// Borrow the accepted entity name that owns this request.
341    #[must_use]
342    pub(in crate::db) const fn entity_name(&self) -> &str {
343        self.entity_name.as_str()
344    }
345
346    /// Borrow the accepted field-path targets.
347    #[must_use]
348    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
349        self.field_paths.as_slice()
350    }
351
352    /// Borrow the accepted key targets in DDL key order.
353    #[must_use]
354    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
355        self.key_items.as_slice()
356    }
357
358    /// Borrow the candidate accepted index snapshot for mutation admission.
359    #[must_use]
360    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
361        &self.candidate_index
362    }
363}
364
365///
366/// BoundSqlDropIndexRequest
367///
368/// Catalog-resolved request for dropping one DDL-published secondary index.
369///
370#[derive(Clone, Debug, Eq, PartialEq)]
371pub(in crate::db) struct BoundSqlDropIndexRequest {
372    index_name: String,
373    entity_name: String,
374    dropped_index: PersistedIndexSnapshot,
375    field_path: Vec<String>,
376}
377
378impl BoundSqlDropIndexRequest {
379    /// Borrow the requested index name.
380    #[must_use]
381    pub(in crate::db) const fn index_name(&self) -> &str {
382        self.index_name.as_str()
383    }
384
385    /// Borrow the accepted entity name that owns this request.
386    #[must_use]
387    pub(in crate::db) const fn entity_name(&self) -> &str {
388        self.entity_name.as_str()
389    }
390
391    /// Borrow the accepted index snapshot that will be removed.
392    #[must_use]
393    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
394        &self.dropped_index
395    }
396
397    /// Borrow the dropped field-path target.
398    #[must_use]
399    pub(in crate::db) const fn field_path(&self) -> &[String] {
400        self.field_path.as_slice()
401    }
402}
403
404///
405/// BoundSqlDdlFieldPath
406///
407/// Accepted field-path target for SQL DDL binding.
408///
409#[derive(Clone, Debug, Eq, PartialEq)]
410pub(in crate::db) struct BoundSqlDdlFieldPath {
411    root: String,
412    segments: Vec<String>,
413    accepted_path: Vec<String>,
414}
415
416impl BoundSqlDdlFieldPath {
417    /// Borrow the top-level field name.
418    #[must_use]
419    pub(in crate::db) const fn root(&self) -> &str {
420        self.root.as_str()
421    }
422
423    /// Borrow nested path segments below the top-level field.
424    #[must_use]
425    pub(in crate::db) const fn segments(&self) -> &[String] {
426        self.segments.as_slice()
427    }
428
429    /// Borrow the full accepted field path used by index metadata.
430    #[must_use]
431    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
432        self.accepted_path.as_slice()
433    }
434}
435
436///
437/// SqlDdlBindError
438///
439/// Typed fail-closed reasons for SQL DDL catalog binding.
440///
441#[derive(Debug, Eq, PartialEq, ThisError)]
442pub(in crate::db) enum SqlDdlBindError {
443    #[error("SQL DDL binder requires a DDL statement")]
444    NotDdl,
445
446    #[error("accepted schema does not expose an entity name")]
447    MissingEntityName,
448
449    #[error("accepted schema does not expose an entity path")]
450    MissingEntityPath,
451
452    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
453    EntityMismatch {
454        sql_entity: String,
455        expected_entity: String,
456    },
457
458    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
459    UnknownFieldPath {
460        entity_name: String,
461        field_path: String,
462    },
463
464    #[error("field path '{field_path}' is not indexable")]
465    FieldPathNotIndexable { field_path: String },
466
467    #[error("field path '{field_path}' depends on generated-only metadata")]
468    FieldPathNotAcceptedCatalogBacked { field_path: String },
469
470    #[error("invalid filtered index predicate: {detail}")]
471    InvalidFilteredIndexPredicate { detail: String },
472
473    #[error("index name '{index_name}' already exists in the accepted schema")]
474    DuplicateIndexName { index_name: String },
475
476    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
477    DuplicateFieldPathIndex {
478        field_path: String,
479        existing_index: String,
480    },
481
482    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
483    UnknownIndex {
484        entity_name: String,
485        index_name: String,
486    },
487
488    #[error(
489        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
490    )]
491    GeneratedIndexDropRejected { index_name: String },
492
493    #[error(
494        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
495    )]
496    UnsupportedDropIndex { index_name: String },
497
498    #[error(
499        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
500    )]
501    UnsupportedAlterTableAddColumn {
502        entity_name: String,
503        column_name: String,
504    },
505
506    #[error(
507        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
508    )]
509    InvalidAlterTableAddColumnDefault {
510        entity_name: String,
511        column_name: String,
512        detail: String,
513    },
514
515    #[error(
516        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
517    )]
518    UnsupportedAlterTableAddColumnNotNull {
519        entity_name: String,
520        column_name: String,
521    },
522
523    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
524    DuplicateColumn {
525        entity_name: String,
526        column_name: String,
527    },
528
529    #[error(
530        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
531    )]
532    UnsupportedAlterTableAddColumnType {
533        entity_name: String,
534        column_name: String,
535        column_type: String,
536    },
537}
538
539///
540/// SqlDdlLoweringError
541///
542/// Typed fail-closed reasons while lowering bound DDL into schema mutation
543/// admission.
544///
545#[derive(Debug, Eq, PartialEq, ThisError)]
546pub(in crate::db) enum SqlDdlLoweringError {
547    #[error("SQL DDL lowering requires a supported DDL statement")]
548    UnsupportedStatement,
549
550    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
551    MutationAdmission(SchemaDdlMutationAdmissionError),
552}
553
554///
555/// SqlDdlPrepareError
556///
557/// Typed fail-closed preparation errors for SQL DDL.
558///
559#[derive(Debug, Eq, PartialEq, ThisError)]
560pub(in crate::db) enum SqlDdlPrepareError {
561    #[error("{0}")]
562    Bind(#[from] SqlDdlBindError),
563
564    #[error("{0}")]
565    Lowering(#[from] SqlDdlLoweringError),
566}
567
568/// Prepare one parsed SQL DDL statement through every pre-execution proof.
569pub(in crate::db) fn prepare_sql_ddl_statement(
570    statement: &SqlStatement,
571    accepted_before: &AcceptedSchemaSnapshot,
572    schema: &SchemaInfo,
573    index_store_path: &'static str,
574) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
575    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
576    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
577        None
578    } else {
579        Some(derive_bound_sql_ddl_accepted_after(
580            accepted_before,
581            &bound,
582        )?)
583    };
584    let report = ddl_preparation_report(&bound);
585
586    Ok(PreparedSqlDdlCommand {
587        bound,
588        derivation,
589        report,
590    })
591}
592
593/// Bind one parsed SQL DDL statement against accepted catalog metadata.
594pub(in crate::db) fn bind_sql_ddl_statement(
595    statement: &SqlStatement,
596    accepted_before: &AcceptedSchemaSnapshot,
597    schema: &SchemaInfo,
598    index_store_path: &'static str,
599) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
600    let SqlStatement::Ddl(ddl) = statement else {
601        return Err(SqlDdlBindError::NotDdl);
602    };
603
604    match ddl {
605        SqlDdlStatement::CreateIndex(statement) => {
606            bind_create_index_statement(statement, schema, index_store_path)
607        }
608        SqlDdlStatement::DropIndex(statement) => {
609            bind_drop_index_statement(statement, accepted_before, schema)
610        }
611        SqlDdlStatement::AlterTableAddColumn(statement) => {
612            bind_alter_table_add_column_statement(statement, accepted_before, schema)
613        }
614    }
615}
616
617fn bind_create_index_statement(
618    statement: &SqlCreateIndexStatement,
619    schema: &SchemaInfo,
620    index_store_path: &'static str,
621) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
622    let entity_name = schema
623        .entity_name()
624        .ok_or(SqlDdlBindError::MissingEntityName)?;
625
626    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
627        return Err(SqlDdlBindError::EntityMismatch {
628            sql_entity: statement.entity.clone(),
629            expected_entity: entity_name.to_string(),
630        });
631    }
632
633    let key_items = statement
634        .key_items
635        .iter()
636        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
637        .collect::<Result<Vec<_>, _>>()?;
638    let field_paths = create_index_field_path_report_items(key_items.as_slice());
639    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
640        if key_items_are_field_path_only(key_items.as_slice())
641            && statement.if_not_exists
642            && existing_field_path_index_matches_request(
643                existing_index,
644                field_paths.as_slice(),
645                statement.predicate_sql.as_deref(),
646                statement.uniqueness,
647            )
648        {
649            return Ok(BoundSqlDdlRequest {
650                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
651                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
652                    index_name: statement.name.clone(),
653                    entity_name: entity_name.to_string(),
654                    target_store: existing_index.store().to_string(),
655                    field_path: ddl_field_path_report(field_paths.as_slice()),
656                }),
657            });
658        }
659
660        return Err(SqlDdlBindError::DuplicateIndexName {
661            index_name: statement.name.clone(),
662        });
663    }
664    let predicate_sql =
665        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
666    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
667        if statement.if_not_exists
668            && existing_expression_index_matches_request(
669                existing_index,
670                key_items.as_slice(),
671                predicate_sql.as_deref(),
672                statement.uniqueness,
673            )
674        {
675            return Ok(BoundSqlDdlRequest {
676                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
677                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
678                    index_name: statement.name.clone(),
679                    entity_name: entity_name.to_string(),
680                    target_store: existing_index.store().to_string(),
681                    field_path: ddl_key_item_report(key_items.as_slice()),
682                }),
683            });
684        }
685
686        return Err(SqlDdlBindError::DuplicateIndexName {
687            index_name: statement.name.clone(),
688        });
689    }
690    if key_items_are_field_path_only(key_items.as_slice()) {
691        reject_duplicate_field_path_index(
692            field_paths.as_slice(),
693            predicate_sql.as_deref(),
694            schema,
695        )?;
696    } else {
697        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
698    }
699    let candidate_index = candidate_index_snapshot(
700        statement.name.as_str(),
701        key_items.as_slice(),
702        predicate_sql.as_deref(),
703        statement.uniqueness,
704        schema,
705        index_store_path,
706    )?;
707
708    Ok(BoundSqlDdlRequest {
709        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
710            index_name: statement.name.clone(),
711            entity_name: entity_name.to_string(),
712            key_items,
713            field_paths,
714            candidate_index,
715        }),
716    })
717}
718
719fn bind_drop_index_statement(
720    statement: &SqlDropIndexStatement,
721    accepted_before: &AcceptedSchemaSnapshot,
722    schema: &SchemaInfo,
723) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
724    let entity_name = schema
725        .entity_name()
726        .ok_or(SqlDdlBindError::MissingEntityName)?;
727
728    if let Some(sql_entity) = statement.entity.as_deref()
729        && !identifiers_tail_match(sql_entity, entity_name)
730    {
731        return Err(SqlDdlBindError::EntityMismatch {
732            sql_entity: sql_entity.to_string(),
733            expected_entity: entity_name.to_string(),
734        });
735    }
736    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
737        accepted_before,
738        &statement.name,
739    )
740    .map_err(|error| match error {
741        SchemaDdlIndexDropCandidateError::Generated => {
742            SqlDdlBindError::GeneratedIndexDropRejected {
743                index_name: statement.name.clone(),
744            }
745        }
746        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
747            entity_name: entity_name.to_string(),
748            index_name: statement.name.clone(),
749        },
750        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
751            index_name: statement.name.clone(),
752        },
753    });
754    let (dropped_index, field_path) = match drop_candidate {
755        Ok((dropped_index, field_path)) => (dropped_index, field_path),
756        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
757            return Ok(BoundSqlDdlRequest {
758                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
759                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
760                    index_name: statement.name.clone(),
761                    entity_name: entity_name.to_string(),
762                    target_store: "-".to_string(),
763                    field_path: Vec::new(),
764                }),
765            });
766        }
767        Err(error) => return Err(error),
768    };
769    Ok(BoundSqlDdlRequest {
770        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
771            index_name: statement.name.clone(),
772            entity_name: entity_name.to_string(),
773            dropped_index,
774            field_path,
775        }),
776    })
777}
778
779fn bind_alter_table_add_column_statement(
780    statement: &SqlAlterTableAddColumnStatement,
781    accepted_before: &AcceptedSchemaSnapshot,
782    schema: &SchemaInfo,
783) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
784    let entity_name = schema
785        .entity_name()
786        .ok_or(SqlDdlBindError::MissingEntityName)?;
787
788    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
789        return Err(SqlDdlBindError::EntityMismatch {
790            sql_entity: statement.entity.clone(),
791            expected_entity: entity_name.to_string(),
792        });
793    }
794
795    if schema
796        .field_nullable(statement.column_name.as_str())
797        .is_some()
798    {
799        return Err(SqlDdlBindError::DuplicateColumn {
800            entity_name: entity_name.to_string(),
801            column_name: statement.column_name.clone(),
802        });
803    }
804
805    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
806        statement.column_type.as_str(),
807    )
808    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
809        entity_name: entity_name.to_string(),
810        column_name: statement.column_name.clone(),
811        column_type: statement.column_type.clone(),
812    })?;
813    let default = schema_field_default_for_sql_default(
814        entity_name,
815        statement.column_name.as_str(),
816        statement.default.as_ref(),
817        &kind,
818        statement.nullable,
819        storage_decode,
820        leaf_codec,
821    )?;
822    if !statement.nullable && default.is_none() {
823        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
824            entity_name: entity_name.to_string(),
825            column_name: statement.column_name.clone(),
826        });
827    }
828    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
829        next_sql_ddl_field_id(accepted_before),
830        statement.column_name.clone(),
831        next_sql_ddl_field_slot(accepted_before),
832        kind,
833        Vec::new(),
834        statement.nullable,
835        default,
836        SchemaFieldWritePolicy::from_model_policies(None, None),
837        PersistedFieldOrigin::SqlDdl,
838        storage_decode,
839        leaf_codec,
840    );
841
842    Ok(BoundSqlDdlRequest {
843        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
844            entity_name: entity_name.to_string(),
845            field,
846        }),
847    })
848}
849
850fn schema_field_default_for_sql_default(
851    entity_name: &str,
852    column_name: &str,
853    default: Option<&crate::value::Value>,
854    kind: &PersistedFieldKind,
855    nullable: bool,
856    storage_decode: FieldStorageDecode,
857    leaf_codec: LeafCodec,
858) -> Result<SchemaFieldDefault, SqlDdlBindError> {
859    let Some(default) = default else {
860        return Ok(SchemaFieldDefault::None);
861    };
862    if matches!(default, crate::value::Value::Null) {
863        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
864            entity_name: entity_name.to_string(),
865            column_name: column_name.to_string(),
866            detail: "NULL cannot be used as an accepted database default".to_string(),
867        });
868    }
869
870    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
871        .unwrap_or_else(|| default.clone());
872    let contract =
873        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
874    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
875        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
876            entity_name: entity_name.to_string(),
877            column_name: column_name.to_string(),
878            detail: error.to_string(),
879        },
880    )?;
881
882    Ok(SchemaFieldDefault::SlotPayload(payload))
883}
884
885fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
886    let next = accepted_before
887        .persisted_snapshot()
888        .fields()
889        .iter()
890        .map(|field| field.id().get())
891        .max()
892        .unwrap_or(0)
893        .checked_add(1)
894        .expect("accepted field IDs should not be exhausted");
895
896    FieldId::new(next)
897}
898
899fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
900    let next = accepted_before
901        .persisted_snapshot()
902        .row_layout()
903        .field_to_slot()
904        .iter()
905        .map(|(_, slot)| slot.get())
906        .max()
907        .unwrap_or(0)
908        .checked_add(1)
909        .expect("accepted row slots should not be exhausted");
910
911    SchemaFieldSlot::new(next)
912}
913
914fn persisted_field_contract_for_sql_column_type(
915    column_type: &str,
916) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
917    let normalized = column_type.trim().to_ascii_lowercase();
918    match normalized.as_str() {
919        "bool" | "boolean" => Some((
920            PersistedFieldKind::Bool,
921            FieldStorageDecode::ByKind,
922            LeafCodec::Scalar(ScalarCodec::Bool),
923        )),
924        "int" | "integer" => Some((
925            PersistedFieldKind::Int,
926            FieldStorageDecode::ByKind,
927            LeafCodec::Scalar(ScalarCodec::Int64),
928        )),
929        "nat" | "natural" => Some((
930            PersistedFieldKind::Nat,
931            FieldStorageDecode::ByKind,
932            LeafCodec::Scalar(ScalarCodec::Nat64),
933        )),
934        "text" | "string" => Some((
935            PersistedFieldKind::Text { max_len: None },
936            FieldStorageDecode::ByKind,
937            LeafCodec::Scalar(ScalarCodec::Text),
938        )),
939        _ => None,
940    }
941}
942
943#[derive(Clone, Debug, Eq, PartialEq)]
944pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
945    FieldPath(BoundSqlDdlFieldPath),
946    Expression(BoundSqlDdlExpressionKey),
947}
948
949///
950/// BoundSqlDdlExpressionKey
951///
952/// Accepted expression-index key target for SQL DDL binding.
953///
954#[derive(Clone, Debug, Eq, PartialEq)]
955pub(in crate::db) struct BoundSqlDdlExpressionKey {
956    op: PersistedIndexExpressionOp,
957    source: BoundSqlDdlFieldPath,
958    canonical_sql: String,
959}
960
961impl BoundSqlDdlExpressionKey {
962    /// Return the accepted expression operation.
963    #[must_use]
964    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
965        self.op
966    }
967
968    /// Borrow the accepted source field path.
969    #[must_use]
970    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
971        &self.source
972    }
973
974    /// Borrow the SQL-facing canonical expression text.
975    #[must_use]
976    pub(in crate::db) const fn canonical_sql(&self) -> &str {
977        self.canonical_sql.as_str()
978    }
979}
980
981fn bind_create_index_key_item(
982    key_item: &SqlCreateIndexKeyItem,
983    entity_name: &str,
984    schema: &SchemaInfo,
985) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
986    match key_item {
987        SqlCreateIndexKeyItem::FieldPath(field_path) => {
988            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
989                .map(BoundSqlDdlCreateIndexKey::FieldPath)
990        }
991        SqlCreateIndexKeyItem::Expression(expression) => {
992            bind_create_index_expression_key(expression, entity_name, schema)
993        }
994    }
995}
996
997fn bind_create_index_expression_key(
998    expression: &SqlCreateIndexExpressionKey,
999    entity_name: &str,
1000    schema: &SchemaInfo,
1001) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1002    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1003
1004    Ok(BoundSqlDdlCreateIndexKey::Expression(
1005        BoundSqlDdlExpressionKey {
1006            op: expression_op_from_sql_function(expression.function),
1007            source,
1008            canonical_sql: expression.canonical_sql(),
1009        },
1010    ))
1011}
1012
1013const fn expression_op_from_sql_function(
1014    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1015) -> PersistedIndexExpressionOp {
1016    match function {
1017        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1018            PersistedIndexExpressionOp::Lower
1019        }
1020        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1021            PersistedIndexExpressionOp::Upper
1022        }
1023        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1024            PersistedIndexExpressionOp::Trim
1025        }
1026    }
1027}
1028
1029fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1030    key_items
1031        .iter()
1032        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1033}
1034
1035fn create_index_field_path_report_items(
1036    key_items: &[BoundSqlDdlCreateIndexKey],
1037) -> Vec<BoundSqlDdlFieldPath> {
1038    key_items
1039        .iter()
1040        .map(|key_item| match key_item {
1041            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1042            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1043        })
1044        .collect()
1045}
1046
1047fn bind_create_index_field_path(
1048    field_path: &str,
1049    entity_name: &str,
1050    schema: &SchemaInfo,
1051) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1052    let mut path = field_path
1053        .split('.')
1054        .map(str::trim)
1055        .filter(|segment| !segment.is_empty());
1056    let Some(root) = path.next() else {
1057        return Err(SqlDdlBindError::UnknownFieldPath {
1058            entity_name: entity_name.to_string(),
1059            field_path: field_path.to_string(),
1060        });
1061    };
1062    let segments = path.map(str::to_string).collect::<Vec<_>>();
1063
1064    let capabilities = if segments.is_empty() {
1065        schema.sql_capabilities(root)
1066    } else {
1067        schema.nested_sql_capabilities(root, segments.as_slice())
1068    }
1069    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1070        entity_name: entity_name.to_string(),
1071        field_path: field_path.to_string(),
1072    })?;
1073
1074    if !capabilities.orderable() {
1075        return Err(SqlDdlBindError::FieldPathNotIndexable {
1076            field_path: field_path.to_string(),
1077        });
1078    }
1079
1080    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1081    accepted_path.push(root.to_string());
1082    accepted_path.extend(segments.iter().cloned());
1083
1084    Ok(BoundSqlDdlFieldPath {
1085        root: root.to_string(),
1086        segments,
1087        accepted_path,
1088    })
1089}
1090
1091fn find_field_path_index_by_name<'a>(
1092    schema: &'a SchemaInfo,
1093    index_name: &str,
1094) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1095    schema
1096        .field_path_indexes()
1097        .iter()
1098        .find(|index| index.name() == index_name)
1099}
1100
1101fn existing_field_path_index_matches_request(
1102    index: &crate::db::schema::SchemaIndexInfo,
1103    field_paths: &[BoundSqlDdlFieldPath],
1104    predicate_sql: Option<&str>,
1105    uniqueness: SqlCreateIndexUniqueness,
1106) -> bool {
1107    let fields = index.fields();
1108
1109    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1110        && index.predicate_sql() == predicate_sql
1111        && fields.len() == field_paths.len()
1112        && fields
1113            .iter()
1114            .zip(field_paths)
1115            .all(|(field, requested)| field.path() == requested.accepted_path())
1116}
1117
1118fn find_expression_index_by_name<'a>(
1119    schema: &'a SchemaInfo,
1120    index_name: &str,
1121) -> Option<&'a SchemaExpressionIndexInfo> {
1122    schema
1123        .expression_indexes()
1124        .iter()
1125        .find(|index| index.name() == index_name)
1126}
1127
1128fn existing_expression_index_matches_request(
1129    index: &SchemaExpressionIndexInfo,
1130    key_items: &[BoundSqlDdlCreateIndexKey],
1131    predicate_sql: Option<&str>,
1132    uniqueness: SqlCreateIndexUniqueness,
1133) -> bool {
1134    let existing_key_items = index.key_items();
1135
1136    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1137        && index.predicate_sql() == predicate_sql
1138        && existing_key_items.len() == key_items.len()
1139        && existing_key_items
1140            .iter()
1141            .zip(key_items)
1142            .all(existing_expression_key_item_matches_request)
1143}
1144
1145fn existing_expression_key_item_matches_request(
1146    existing: (
1147        &SchemaExpressionIndexKeyItemInfo,
1148        &BoundSqlDdlCreateIndexKey,
1149    ),
1150) -> bool {
1151    let (existing, requested) = existing;
1152    match (existing, requested) {
1153        (
1154            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1155            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1156        ) => existing.path() == requested.accepted_path(),
1157        (
1158            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1159            BoundSqlDdlCreateIndexKey::Expression(requested),
1160        ) => existing_expression_component_matches_request(
1161            existing.op(),
1162            existing.source().path(),
1163            existing.canonical_text(),
1164            requested,
1165        ),
1166        _ => false,
1167    }
1168}
1169
1170fn existing_expression_component_matches_request(
1171    existing_op: PersistedIndexExpressionOp,
1172    existing_path: &[String],
1173    existing_canonical_text: &str,
1174    requested: &BoundSqlDdlExpressionKey,
1175) -> bool {
1176    let requested_path = requested.source().accepted_path();
1177    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1178
1179    existing_op == requested.op()
1180        && existing_path == requested_path
1181        && existing_canonical_text == requested_canonical_text
1182}
1183
1184fn reject_duplicate_expression_index(
1185    key_items: &[BoundSqlDdlCreateIndexKey],
1186    predicate_sql: Option<&str>,
1187    schema: &SchemaInfo,
1188) -> Result<(), SqlDdlBindError> {
1189    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1190        existing_expression_index_matches_request(
1191            index,
1192            key_items,
1193            predicate_sql,
1194            if index.unique() {
1195                SqlCreateIndexUniqueness::Unique
1196            } else {
1197                SqlCreateIndexUniqueness::NonUnique
1198            },
1199        )
1200    }) else {
1201        return Ok(());
1202    };
1203
1204    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1205        field_path: ddl_key_item_report(key_items).join(","),
1206        existing_index: existing_index.name().to_string(),
1207    })
1208}
1209
1210fn reject_duplicate_field_path_index(
1211    field_paths: &[BoundSqlDdlFieldPath],
1212    predicate_sql: Option<&str>,
1213    schema: &SchemaInfo,
1214) -> Result<(), SqlDdlBindError> {
1215    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1216        let fields = index.fields();
1217        index.predicate_sql() == predicate_sql
1218            && fields.len() == field_paths.len()
1219            && fields
1220                .iter()
1221                .zip(field_paths)
1222                .all(|(field, requested)| field.path() == requested.accepted_path())
1223    }) else {
1224        return Ok(());
1225    };
1226
1227    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1228        field_path: ddl_field_path_report(field_paths).join(","),
1229        existing_index: existing_index.name().to_string(),
1230    })
1231}
1232
1233fn candidate_index_snapshot(
1234    index_name: &str,
1235    key_items: &[BoundSqlDdlCreateIndexKey],
1236    predicate_sql: Option<&str>,
1237    uniqueness: SqlCreateIndexUniqueness,
1238    schema: &SchemaInfo,
1239    index_store_path: &'static str,
1240) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1241    let key = if key_items_are_field_path_only(key_items) {
1242        PersistedIndexKeySnapshot::FieldPath(
1243            key_items
1244                .iter()
1245                .map(|key_item| {
1246                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1247                        unreachable!("field-path-only index checked before field-path lowering");
1248                    };
1249
1250                    accepted_index_field_path_snapshot(schema, field_path)
1251                })
1252                .collect::<Result<Vec<_>, _>>()?,
1253        )
1254    } else {
1255        PersistedIndexKeySnapshot::Items(
1256            key_items
1257                .iter()
1258                .map(|key_item| match key_item {
1259                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1260                        accepted_index_field_path_snapshot(schema, field_path)
1261                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1262                    }
1263                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1264                        accepted_index_expression_snapshot(schema, expression)
1265                    }
1266                })
1267                .collect::<Result<Vec<_>, _>>()?,
1268        )
1269    };
1270
1271    Ok(PersistedIndexSnapshot::new_sql_ddl(
1272        schema.next_secondary_index_ordinal(),
1273        index_name.to_string(),
1274        index_store_path.to_string(),
1275        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1276        key,
1277        predicate_sql.map(str::to_string),
1278    ))
1279}
1280
1281fn accepted_index_field_path_snapshot(
1282    schema: &SchemaInfo,
1283    field_path: &BoundSqlDdlFieldPath,
1284) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1285    schema
1286        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1287        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1288            field_path: field_path.accepted_path().join("."),
1289        })
1290}
1291
1292fn accepted_index_expression_snapshot(
1293    schema: &SchemaInfo,
1294    expression: &BoundSqlDdlExpressionKey,
1295) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1296    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1297    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1298        return Err(SqlDdlBindError::FieldPathNotIndexable {
1299            field_path: expression.source().accepted_path().join("."),
1300        });
1301    };
1302
1303    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1304        PersistedIndexExpressionSnapshot::new(
1305            expression.op(),
1306            source.clone(),
1307            source.kind().clone(),
1308            output_kind,
1309            format!("expr:v1:{}", expression.canonical_sql()),
1310        ),
1311    )))
1312}
1313
1314fn expression_output_kind(
1315    op: PersistedIndexExpressionOp,
1316    source_kind: &PersistedFieldKind,
1317) -> Option<PersistedFieldKind> {
1318    match op {
1319        PersistedIndexExpressionOp::Lower
1320        | PersistedIndexExpressionOp::Upper
1321        | PersistedIndexExpressionOp::Trim
1322        | PersistedIndexExpressionOp::LowerTrim => {
1323            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1324                Some(source_kind.clone())
1325            } else {
1326                None
1327            }
1328        }
1329        PersistedIndexExpressionOp::Date => {
1330            if matches!(
1331                source_kind,
1332                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1333            ) {
1334                Some(PersistedFieldKind::Date)
1335            } else {
1336                None
1337            }
1338        }
1339        PersistedIndexExpressionOp::Year
1340        | PersistedIndexExpressionOp::Month
1341        | PersistedIndexExpressionOp::Day => {
1342            if matches!(
1343                source_kind,
1344                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1345            ) {
1346                Some(PersistedFieldKind::Int)
1347            } else {
1348                None
1349            }
1350        }
1351    }
1352}
1353
1354fn validated_create_index_predicate_sql(
1355    predicate_sql: Option<&str>,
1356    schema: &SchemaInfo,
1357) -> Result<Option<String>, SqlDdlBindError> {
1358    let Some(predicate_sql) = predicate_sql else {
1359        return Ok(None);
1360    };
1361    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1362        SqlDdlBindError::InvalidFilteredIndexPredicate {
1363            detail: error.to_string(),
1364        }
1365    })?;
1366    validate_predicate(schema, &predicate).map_err(|error| {
1367        SqlDdlBindError::InvalidFilteredIndexPredicate {
1368            detail: error.to_string(),
1369        }
1370    })?;
1371
1372    Ok(Some(predicate_sql.to_string()))
1373}
1374
1375fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1376    match field_paths {
1377        [field_path] => field_path.accepted_path().to_vec(),
1378        _ => vec![
1379            field_paths
1380                .iter()
1381                .map(|field_path| field_path.accepted_path().join("."))
1382                .collect::<Vec<_>>()
1383                .join(","),
1384        ],
1385    }
1386}
1387
1388fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1389    match key_items {
1390        [key_item] => vec![ddl_key_item_text(key_item)],
1391        _ => vec![
1392            key_items
1393                .iter()
1394                .map(ddl_key_item_text)
1395                .collect::<Vec<_>>()
1396                .join(","),
1397        ],
1398    }
1399}
1400
1401fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1402    match key_item {
1403        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1404        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1405    }
1406}
1407
1408/// Lower one bound SQL DDL request through schema mutation admission.
1409pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1410    request: &BoundSqlDdlRequest,
1411) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1412    match request.statement() {
1413        BoundSqlDdlStatement::AddColumn(add) => {
1414            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
1415        }
1416        BoundSqlDdlStatement::CreateIndex(create) => {
1417            if create.candidate_index().key().is_field_path_only() {
1418                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1419            } else {
1420                admit_sql_ddl_expression_index_candidate(create.candidate_index())
1421            }
1422        }
1423        BoundSqlDdlStatement::DropIndex(drop) => {
1424            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
1425        }
1426        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1427    }
1428    .map_err(SqlDdlLoweringError::MutationAdmission)
1429}
1430
1431/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
1432pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
1433    accepted_before: &AcceptedSchemaSnapshot,
1434    request: &BoundSqlDdlRequest,
1435) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
1436    match request.statement() {
1437        BoundSqlDdlStatement::AddColumn(add) => {
1438            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
1439        }
1440        BoundSqlDdlStatement::CreateIndex(create) => {
1441            if create.candidate_index().key().is_field_path_only() {
1442                derive_sql_ddl_field_path_index_accepted_after(
1443                    accepted_before,
1444                    create.candidate_index().clone(),
1445                )
1446            } else {
1447                derive_sql_ddl_expression_index_accepted_after(
1448                    accepted_before,
1449                    create.candidate_index().clone(),
1450                )
1451            }
1452        }
1453        BoundSqlDdlStatement::DropIndex(drop) => {
1454            derive_sql_ddl_secondary_index_drop_accepted_after(
1455                accepted_before,
1456                drop.dropped_index(),
1457            )
1458        }
1459        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1460    }
1461    .map_err(SqlDdlLoweringError::MutationAdmission)
1462}
1463
1464fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
1465    match bound.statement() {
1466        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
1467            mutation_kind: if add.field().default().is_none() {
1468                SqlDdlMutationKind::AddNullableField
1469            } else {
1470                SqlDdlMutationKind::AddDefaultedField
1471            },
1472            target_index: add.field().name().to_string(),
1473            target_store: add.entity_name().to_string(),
1474            field_path: vec![add.field().name().to_string()],
1475            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1476            rows_scanned: 0,
1477            index_keys_written: 0,
1478        },
1479        BoundSqlDdlStatement::CreateIndex(create) => {
1480            let target = create.candidate_index();
1481
1482            SqlDdlPreparationReport {
1483                mutation_kind: if target.key().is_field_path_only() {
1484                    SqlDdlMutationKind::AddFieldPathIndex
1485                } else {
1486                    SqlDdlMutationKind::AddExpressionIndex
1487                },
1488                target_index: target.name().to_string(),
1489                target_store: target.store().to_string(),
1490                field_path: ddl_key_item_report(create.key_items()),
1491                execution_status: SqlDdlExecutionStatus::PreparedOnly,
1492                rows_scanned: 0,
1493                index_keys_written: 0,
1494            }
1495        }
1496        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
1497            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1498            target_index: drop.index_name().to_string(),
1499            target_store: drop.dropped_index().store().to_string(),
1500            field_path: drop.field_path().to_vec(),
1501            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1502            rows_scanned: 0,
1503            index_keys_written: 0,
1504        },
1505        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
1506            mutation_kind: no_op.mutation_kind(),
1507            target_index: no_op.index_name().to_string(),
1508            target_store: no_op.target_store().to_string(),
1509            field_path: no_op.field_path().to_vec(),
1510            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1511            rows_scanned: 0,
1512            index_keys_written: 0,
1513        },
1514    }
1515}