Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    data::encode_runtime_value_for_accepted_field_contract,
13    predicate::parse_sql_predicate,
14    query::predicate::validate_predicate,
15    schema::{
16        AcceptedFieldDecodeContract, AcceptedSchemaSnapshot, FieldId, PersistedFieldKind,
17        PersistedFieldOrigin, PersistedFieldSnapshot, PersistedIndexExpressionOp,
18        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
19        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
20        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
21        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
22        SchemaExpressionIndexKeyItemInfo, SchemaFieldDefault, SchemaFieldSlot,
23        SchemaFieldWritePolicy, SchemaInfo, admit_sql_ddl_expression_index_candidate,
24        admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
25        admit_sql_ddl_field_nullability_candidate, admit_sql_ddl_field_path_index_candidate,
26        admit_sql_ddl_secondary_index_drop_candidate,
27        canonicalize_strict_sql_literal_for_persisted_kind,
28        derive_sql_ddl_expression_index_accepted_after,
29        derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
30        derive_sql_ddl_field_nullability_accepted_after,
31        derive_sql_ddl_field_path_index_accepted_after,
32        derive_sql_ddl_secondary_index_drop_accepted_after,
33        resolve_sql_ddl_field_drop_dependent_index, resolve_sql_ddl_secondary_index_drop_candidate,
34    },
35    sql::{
36        identifier::identifiers_tail_match,
37        parser::{
38            SqlAlterColumnAction, SqlAlterTableAddColumnStatement,
39            SqlAlterTableAlterColumnStatement, SqlAlterTableDropColumnStatement,
40            SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem, SqlCreateIndexStatement,
41            SqlCreateIndexUniqueness, SqlDdlStatement, SqlDropIndexStatement, SqlStatement,
42        },
43    },
44};
45use crate::model::field::{FieldStorageDecode, LeafCodec, ScalarCodec};
46use thiserror::Error as ThisError;
47
48///
49/// PreparedSqlDdlCommand
50///
51/// Fully prepared SQL DDL command. This is intentionally not executable yet:
52/// it packages the accepted-catalog binding, accepted-after derivation, and
53/// schema mutation admission proof for the future execution boundary.
54///
55#[derive(Clone, Debug, Eq, PartialEq)]
56pub(in crate::db) struct PreparedSqlDdlCommand {
57    bound: BoundSqlDdlRequest,
58    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
59    report: SqlDdlPreparationReport,
60}
61
62impl PreparedSqlDdlCommand {
63    /// Borrow the accepted-catalog-bound DDL request.
64    #[must_use]
65    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
66        &self.bound
67    }
68
69    /// Borrow the accepted-after derivation proof.
70    #[must_use]
71    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
72        self.derivation.as_ref()
73    }
74
75    /// Borrow the developer-facing preparation report.
76    #[must_use]
77    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
78        &self.report
79    }
80
81    /// Return whether this prepared command needs schema or storage mutation.
82    #[must_use]
83    pub(in crate::db) const fn mutates_schema(&self) -> bool {
84        self.derivation.is_some()
85    }
86}
87
88///
89/// SqlDdlPreparationReport
90///
91/// Compact report for a DDL command that has passed all pre-execution
92/// frontend and schema-mutation checks.
93///
94#[derive(Clone, Debug, Eq, PartialEq)]
95pub struct SqlDdlPreparationReport {
96    mutation_kind: SqlDdlMutationKind,
97    target_index: String,
98    target_store: String,
99    field_path: Vec<String>,
100    execution_status: SqlDdlExecutionStatus,
101    rows_scanned: usize,
102    index_keys_written: usize,
103}
104
105impl SqlDdlPreparationReport {
106    /// Return the prepared DDL mutation kind.
107    #[must_use]
108    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
109        self.mutation_kind
110    }
111
112    /// Borrow the target accepted index name.
113    #[must_use]
114    pub const fn target_index(&self) -> &str {
115        self.target_index.as_str()
116    }
117
118    /// Borrow the target accepted index store path.
119    #[must_use]
120    pub const fn target_store(&self) -> &str {
121        self.target_store.as_str()
122    }
123
124    /// Borrow the target field path.
125    #[must_use]
126    pub const fn field_path(&self) -> &[String] {
127        self.field_path.as_slice()
128    }
129
130    /// Return the execution status captured by this DDL report.
131    #[must_use]
132    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
133        self.execution_status
134    }
135
136    /// Return rows scanned by DDL execution.
137    #[must_use]
138    pub const fn rows_scanned(&self) -> usize {
139        self.rows_scanned
140    }
141
142    /// Return index keys written by DDL execution.
143    #[must_use]
144    pub const fn index_keys_written(&self) -> usize {
145        self.index_keys_written
146    }
147
148    pub(in crate::db) const fn with_execution_status(
149        mut self,
150        execution_status: SqlDdlExecutionStatus,
151    ) -> Self {
152        self.execution_status = execution_status;
153        self
154    }
155
156    pub(in crate::db) const fn with_execution_metrics(
157        mut self,
158        rows_scanned: usize,
159        index_keys_written: usize,
160    ) -> Self {
161        self.rows_scanned = rows_scanned;
162        self.index_keys_written = index_keys_written;
163        self
164    }
165}
166
167///
168/// SqlDdlMutationKind
169///
170/// Developer-facing SQL DDL mutation kind.
171///
172#[derive(Clone, Copy, Debug, Eq, PartialEq)]
173pub enum SqlDdlMutationKind {
174    AddDefaultedField,
175    AddNullableField,
176    SetFieldDefault,
177    DropFieldDefault,
178    SetFieldNotNull,
179    DropFieldNotNull,
180    DropField,
181    AddFieldPathIndex,
182    AddExpressionIndex,
183    DropSecondaryIndex,
184}
185
186impl SqlDdlMutationKind {
187    /// Return the stable diagnostic label for this DDL mutation kind.
188    #[must_use]
189    pub const fn as_str(self) -> &'static str {
190        match self {
191            Self::AddDefaultedField => "add_defaulted_field",
192            Self::AddNullableField => "add_nullable_field",
193            Self::SetFieldDefault => "set_field_default",
194            Self::DropFieldDefault => "drop_field_default",
195            Self::SetFieldNotNull => "set_field_not_null",
196            Self::DropFieldNotNull => "drop_field_not_null",
197            Self::DropField => "drop_field",
198            Self::AddFieldPathIndex => "add_field_path_index",
199            Self::AddExpressionIndex => "add_expression_index",
200            Self::DropSecondaryIndex => "drop_secondary_index",
201        }
202    }
203}
204
205///
206/// SqlDdlExecutionStatus
207///
208/// SQL DDL execution state at the current boundary.
209///
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub enum SqlDdlExecutionStatus {
212    PreparedOnly,
213    Published,
214    NoOp,
215}
216
217impl SqlDdlExecutionStatus {
218    /// Return the stable diagnostic label for this execution status.
219    #[must_use]
220    pub const fn as_str(self) -> &'static str {
221        match self {
222            Self::PreparedOnly => "prepared_only",
223            Self::Published => "published",
224            Self::NoOp => "no_op",
225        }
226    }
227}
228
229///
230/// BoundSqlDdlRequest
231///
232/// Accepted-catalog SQL DDL request after parser syntax has been resolved
233/// against one runtime schema snapshot.
234///
235#[derive(Clone, Debug, Eq, PartialEq)]
236pub(in crate::db) struct BoundSqlDdlRequest {
237    statement: BoundSqlDdlStatement,
238}
239
240impl BoundSqlDdlRequest {
241    /// Borrow the bound statement payload.
242    #[must_use]
243    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
244        &self.statement
245    }
246}
247
248///
249/// BoundSqlDdlStatement
250///
251/// Catalog-resolved DDL statement vocabulary.
252///
253#[derive(Clone, Debug, Eq, PartialEq)]
254pub(in crate::db) enum BoundSqlDdlStatement {
255    AddColumn(BoundSqlAddColumnRequest),
256    AlterColumnDefault(BoundSqlAlterColumnDefaultRequest),
257    AlterColumnNullability(BoundSqlAlterColumnNullabilityRequest),
258    CreateIndex(BoundSqlCreateIndexRequest),
259    DropIndex(BoundSqlDropIndexRequest),
260    NoOp(BoundSqlDdlNoOpRequest),
261}
262
263///
264/// BoundSqlAddColumnRequest
265///
266/// Catalog-resolved additive field DDL request.
267///
268#[derive(Clone, Debug, Eq, PartialEq)]
269pub(in crate::db) struct BoundSqlAddColumnRequest {
270    entity_name: String,
271    field: PersistedFieldSnapshot,
272}
273
274impl BoundSqlAddColumnRequest {
275    /// Borrow the accepted entity name.
276    #[must_use]
277    pub(in crate::db) const fn entity_name(&self) -> &str {
278        self.entity_name.as_str()
279    }
280
281    /// Borrow the accepted DDL-owned field snapshot to publish.
282    #[must_use]
283    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
284        &self.field
285    }
286}
287
288///
289/// BoundSqlAlterColumnDefaultRequest
290///
291/// Catalog-resolved field-default metadata DDL request.
292///
293#[derive(Clone, Debug, Eq, PartialEq)]
294pub(in crate::db) struct BoundSqlAlterColumnDefaultRequest {
295    entity_name: String,
296    field: PersistedFieldSnapshot,
297    default: SchemaFieldDefault,
298    mutation_kind: SqlDdlMutationKind,
299}
300
301impl BoundSqlAlterColumnDefaultRequest {
302    /// Borrow the accepted entity name.
303    #[must_use]
304    pub(in crate::db) const fn entity_name(&self) -> &str {
305        self.entity_name.as_str()
306    }
307
308    /// Borrow the accepted field name.
309    #[must_use]
310    pub(in crate::db) const fn field_name(&self) -> &str {
311        self.field.name()
312    }
313
314    /// Borrow the accepted field whose default will change.
315    #[must_use]
316    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
317        &self.field
318    }
319
320    /// Borrow the default contract to publish.
321    #[must_use]
322    pub(in crate::db) const fn default(&self) -> &SchemaFieldDefault {
323        &self.default
324    }
325
326    /// Return the field-default mutation kind.
327    #[must_use]
328    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
329        self.mutation_kind
330    }
331}
332
333///
334/// BoundSqlAlterColumnNullabilityRequest
335///
336/// Catalog-resolved field-nullability metadata DDL request.
337///
338#[derive(Clone, Debug, Eq, PartialEq)]
339pub(in crate::db) struct BoundSqlAlterColumnNullabilityRequest {
340    entity_name: String,
341    field: PersistedFieldSnapshot,
342    nullable: bool,
343    mutation_kind: SqlDdlMutationKind,
344}
345
346impl BoundSqlAlterColumnNullabilityRequest {
347    /// Borrow the accepted entity name.
348    #[must_use]
349    pub(in crate::db) const fn entity_name(&self) -> &str {
350        self.entity_name.as_str()
351    }
352
353    /// Borrow the accepted field name.
354    #[must_use]
355    pub(in crate::db) const fn field_name(&self) -> &str {
356        self.field.name()
357    }
358
359    /// Borrow the accepted field whose nullability will change.
360    #[must_use]
361    pub(in crate::db) const fn field(&self) -> &PersistedFieldSnapshot {
362        &self.field
363    }
364
365    /// Return the nullable contract to publish.
366    #[must_use]
367    pub(in crate::db) const fn nullable(&self) -> bool {
368        self.nullable
369    }
370
371    /// Return the field-nullability mutation kind.
372    #[must_use]
373    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
374        self.mutation_kind
375    }
376}
377
378///
379/// BoundSqlDdlNoOpRequest
380///
381/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
382///
383#[derive(Clone, Debug, Eq, PartialEq)]
384pub(in crate::db) struct BoundSqlDdlNoOpRequest {
385    mutation_kind: SqlDdlMutationKind,
386    index_name: String,
387    entity_name: String,
388    target_store: String,
389    field_path: Vec<String>,
390}
391
392impl BoundSqlDdlNoOpRequest {
393    /// Return the user-facing mutation family this no-op belongs to.
394    #[must_use]
395    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
396        self.mutation_kind
397    }
398
399    /// Borrow the requested index name.
400    #[must_use]
401    pub(in crate::db) const fn index_name(&self) -> &str {
402        self.index_name.as_str()
403    }
404
405    /// Borrow the accepted entity name that owns this request.
406    #[must_use]
407    pub(in crate::db) const fn entity_name(&self) -> &str {
408        self.entity_name.as_str()
409    }
410
411    /// Borrow the accepted index store path, or `-` when no target exists.
412    #[must_use]
413    pub(in crate::db) const fn target_store(&self) -> &str {
414        self.target_store.as_str()
415    }
416
417    /// Borrow the target field path, empty when no target exists.
418    #[must_use]
419    pub(in crate::db) const fn field_path(&self) -> &[String] {
420        self.field_path.as_slice()
421    }
422}
423
424///
425/// BoundSqlCreateIndexRequest
426///
427/// Catalog-resolved request for adding one secondary index.
428///
429#[derive(Clone, Debug, Eq, PartialEq)]
430pub(in crate::db) struct BoundSqlCreateIndexRequest {
431    index_name: String,
432    entity_name: String,
433    key_items: Vec<BoundSqlDdlCreateIndexKey>,
434    field_paths: Vec<BoundSqlDdlFieldPath>,
435    candidate_index: PersistedIndexSnapshot,
436}
437
438impl BoundSqlCreateIndexRequest {
439    /// Borrow the requested index name.
440    #[must_use]
441    pub(in crate::db) const fn index_name(&self) -> &str {
442        self.index_name.as_str()
443    }
444
445    /// Borrow the accepted entity name that owns this request.
446    #[must_use]
447    pub(in crate::db) const fn entity_name(&self) -> &str {
448        self.entity_name.as_str()
449    }
450
451    /// Borrow the accepted field-path targets.
452    #[must_use]
453    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
454        self.field_paths.as_slice()
455    }
456
457    /// Borrow the accepted key targets in DDL key order.
458    #[must_use]
459    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
460        self.key_items.as_slice()
461    }
462
463    /// Borrow the candidate accepted index snapshot for mutation admission.
464    #[must_use]
465    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
466        &self.candidate_index
467    }
468}
469
470///
471/// BoundSqlDropIndexRequest
472///
473/// Catalog-resolved request for dropping one DDL-published secondary index.
474///
475#[derive(Clone, Debug, Eq, PartialEq)]
476pub(in crate::db) struct BoundSqlDropIndexRequest {
477    index_name: String,
478    entity_name: String,
479    dropped_index: PersistedIndexSnapshot,
480    field_path: Vec<String>,
481}
482
483impl BoundSqlDropIndexRequest {
484    /// Borrow the requested index name.
485    #[must_use]
486    pub(in crate::db) const fn index_name(&self) -> &str {
487        self.index_name.as_str()
488    }
489
490    /// Borrow the accepted entity name that owns this request.
491    #[must_use]
492    pub(in crate::db) const fn entity_name(&self) -> &str {
493        self.entity_name.as_str()
494    }
495
496    /// Borrow the accepted index snapshot that will be removed.
497    #[must_use]
498    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
499        &self.dropped_index
500    }
501
502    /// Borrow the dropped field-path target.
503    #[must_use]
504    pub(in crate::db) const fn field_path(&self) -> &[String] {
505        self.field_path.as_slice()
506    }
507}
508
509///
510/// BoundSqlDdlFieldPath
511///
512/// Accepted field-path target for SQL DDL binding.
513///
514#[derive(Clone, Debug, Eq, PartialEq)]
515pub(in crate::db) struct BoundSqlDdlFieldPath {
516    root: String,
517    segments: Vec<String>,
518    accepted_path: Vec<String>,
519}
520
521impl BoundSqlDdlFieldPath {
522    /// Borrow the top-level field name.
523    #[must_use]
524    pub(in crate::db) const fn root(&self) -> &str {
525        self.root.as_str()
526    }
527
528    /// Borrow nested path segments below the top-level field.
529    #[must_use]
530    pub(in crate::db) const fn segments(&self) -> &[String] {
531        self.segments.as_slice()
532    }
533
534    /// Borrow the full accepted field path used by index metadata.
535    #[must_use]
536    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
537        self.accepted_path.as_slice()
538    }
539}
540
541///
542/// SqlDdlBindError
543///
544/// Typed fail-closed reasons for SQL DDL catalog binding.
545///
546#[derive(Debug, Eq, PartialEq, ThisError)]
547pub(in crate::db) enum SqlDdlBindError {
548    #[error("SQL DDL binder requires a DDL statement")]
549    NotDdl,
550
551    #[error("accepted schema does not expose an entity name")]
552    MissingEntityName,
553
554    #[error("accepted schema does not expose an entity path")]
555    MissingEntityPath,
556
557    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
558    EntityMismatch {
559        sql_entity: String,
560        expected_entity: String,
561    },
562
563    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
564    UnknownFieldPath {
565        entity_name: String,
566        field_path: String,
567    },
568
569    #[error("field path '{field_path}' is not indexable")]
570    FieldPathNotIndexable { field_path: String },
571
572    #[error("field path '{field_path}' depends on generated-only metadata")]
573    FieldPathNotAcceptedCatalogBacked { field_path: String },
574
575    #[error("invalid filtered index predicate: {detail}")]
576    InvalidFilteredIndexPredicate { detail: String },
577
578    #[error("index name '{index_name}' already exists in the accepted schema")]
579    DuplicateIndexName { index_name: String },
580
581    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
582    DuplicateFieldPathIndex {
583        field_path: String,
584        existing_index: String,
585    },
586
587    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
588    UnknownIndex {
589        entity_name: String,
590        index_name: String,
591    },
592
593    #[error(
594        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
595    )]
596    GeneratedIndexDropRejected { index_name: String },
597
598    #[error(
599        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
600    )]
601    UnsupportedDropIndex { index_name: String },
602
603    #[error(
604        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
605    )]
606    UnsupportedAlterTableAddColumn {
607        entity_name: String,
608        column_name: String,
609    },
610
611    #[error(
612        "SQL DDL ALTER TABLE ADD COLUMN DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
613    )]
614    InvalidAlterTableAddColumnDefault {
615        entity_name: String,
616        column_name: String,
617        detail: String,
618    },
619
620    #[error(
621        "SQL DDL ALTER TABLE ADD COLUMN NOT NULL is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
622    )]
623    UnsupportedAlterTableAddColumnNotNull {
624        entity_name: String,
625        column_name: String,
626    },
627
628    #[error("field '{column_name}' already exists in accepted entity '{entity_name}'")]
629    DuplicateColumn {
630        entity_name: String,
631        column_name: String,
632    },
633
634    #[error(
635        "SQL DDL ALTER TABLE ADD COLUMN type '{column_type}' is not supported yet for accepted entity '{entity_name}' column '{column_name}'"
636    )]
637    UnsupportedAlterTableAddColumnType {
638        entity_name: String,
639        column_name: String,
640        column_type: String,
641    },
642
643    #[error("unknown column '{column_name}' for accepted entity '{entity_name}'")]
644    UnknownColumn {
645        entity_name: String,
646        column_name: String,
647    },
648
649    #[error(
650        "SQL DDL ALTER TABLE ALTER COLUMN {action} is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
651    )]
652    UnsupportedAlterTableAlterColumn {
653        entity_name: String,
654        column_name: String,
655        action: String,
656    },
657
658    #[error(
659        "SQL DDL ALTER TABLE ALTER COLUMN SET DEFAULT value is not encodable for accepted entity '{entity_name}' column '{column_name}': {detail}"
660    )]
661    InvalidAlterTableAlterColumnDefault {
662        entity_name: String,
663        column_name: String,
664        detail: String,
665    },
666
667    #[error(
668        "SQL DDL ALTER TABLE ALTER COLUMN DROP DEFAULT is not executable yet for required accepted entity '{entity_name}' column '{column_name}'"
669    )]
670    UnsupportedAlterTableDropDefaultRequired {
671        entity_name: String,
672        column_name: String,
673    },
674
675    #[error(
676        "SQL DDL ALTER TABLE ALTER COLUMN DEFAULT cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema default instead"
677    )]
678    GeneratedFieldDefaultChangeRejected {
679        entity_name: String,
680        column_name: String,
681    },
682
683    #[error(
684        "SQL DDL ALTER TABLE ALTER COLUMN NULLABILITY cannot change generated accepted field '{column_name}' on entity '{entity_name}'; change the Rust schema nullability instead"
685    )]
686    GeneratedFieldNullabilityChangeRejected {
687        entity_name: String,
688        column_name: String,
689    },
690
691    #[error(
692        "SQL DDL ALTER TABLE DROP COLUMN cannot drop primary-key field '{column_name}' on entity '{entity_name}'"
693    )]
694    PrimaryKeyFieldDropRejected {
695        entity_name: String,
696        column_name: String,
697    },
698
699    #[error(
700        "SQL DDL ALTER TABLE DROP COLUMN cannot change generated accepted field '{column_name}' on entity '{entity_name}'; remove the field from the Rust schema instead"
701    )]
702    GeneratedFieldDropRejected {
703        entity_name: String,
704        column_name: String,
705    },
706
707    #[error(
708        "SQL DDL ALTER TABLE DROP COLUMN cannot drop accepted field '{column_name}' on entity '{entity_name}' while index '{index_name}' depends on it; drop dependent DDL-owned indexes first"
709    )]
710    IndexedFieldDropRejected {
711        entity_name: String,
712        column_name: String,
713        index_name: String,
714    },
715
716    #[error(
717        "SQL DDL ALTER TABLE DROP COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'; retained-slot field removal policy is not enabled yet"
718    )]
719    UnsupportedAlterTableDropColumn {
720        entity_name: String,
721        column_name: String,
722    },
723}
724
725///
726/// SqlDdlLoweringError
727///
728/// Typed fail-closed reasons while lowering bound DDL into schema mutation
729/// admission.
730///
731#[derive(Debug, Eq, PartialEq, ThisError)]
732pub(in crate::db) enum SqlDdlLoweringError {
733    #[error("SQL DDL lowering requires a supported DDL statement")]
734    UnsupportedStatement,
735
736    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
737    MutationAdmission(SchemaDdlMutationAdmissionError),
738}
739
740///
741/// SqlDdlPrepareError
742///
743/// Typed fail-closed preparation errors for SQL DDL.
744///
745#[derive(Debug, Eq, PartialEq, ThisError)]
746pub(in crate::db) enum SqlDdlPrepareError {
747    #[error("{0}")]
748    Bind(#[from] SqlDdlBindError),
749
750    #[error("{0}")]
751    Lowering(#[from] SqlDdlLoweringError),
752}
753
754/// Prepare one parsed SQL DDL statement through every pre-execution proof.
755pub(in crate::db) fn prepare_sql_ddl_statement(
756    statement: &SqlStatement,
757    accepted_before: &AcceptedSchemaSnapshot,
758    schema: &SchemaInfo,
759    index_store_path: &'static str,
760) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
761    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
762    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
763        None
764    } else {
765        Some(derive_bound_sql_ddl_accepted_after(
766            accepted_before,
767            &bound,
768        )?)
769    };
770    let report = ddl_preparation_report(&bound);
771
772    Ok(PreparedSqlDdlCommand {
773        bound,
774        derivation,
775        report,
776    })
777}
778
779/// Bind one parsed SQL DDL statement against accepted catalog metadata.
780pub(in crate::db) fn bind_sql_ddl_statement(
781    statement: &SqlStatement,
782    accepted_before: &AcceptedSchemaSnapshot,
783    schema: &SchemaInfo,
784    index_store_path: &'static str,
785) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
786    let SqlStatement::Ddl(ddl) = statement else {
787        return Err(SqlDdlBindError::NotDdl);
788    };
789
790    match ddl {
791        SqlDdlStatement::CreateIndex(statement) => {
792            bind_create_index_statement(statement, schema, index_store_path)
793        }
794        SqlDdlStatement::DropIndex(statement) => {
795            bind_drop_index_statement(statement, accepted_before, schema)
796        }
797        SqlDdlStatement::AlterTableAddColumn(statement) => {
798            bind_alter_table_add_column_statement(statement, accepted_before, schema)
799        }
800        SqlDdlStatement::AlterTableAlterColumn(statement) => {
801            bind_alter_table_alter_column_statement(statement, accepted_before, schema)
802        }
803        SqlDdlStatement::AlterTableDropColumn(statement) => {
804            bind_alter_table_drop_column_statement(statement, accepted_before, schema)
805        }
806    }
807}
808
809fn bind_create_index_statement(
810    statement: &SqlCreateIndexStatement,
811    schema: &SchemaInfo,
812    index_store_path: &'static str,
813) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
814    let entity_name = schema
815        .entity_name()
816        .ok_or(SqlDdlBindError::MissingEntityName)?;
817
818    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
819        return Err(SqlDdlBindError::EntityMismatch {
820            sql_entity: statement.entity.clone(),
821            expected_entity: entity_name.to_string(),
822        });
823    }
824
825    let key_items = statement
826        .key_items
827        .iter()
828        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
829        .collect::<Result<Vec<_>, _>>()?;
830    let field_paths = create_index_field_path_report_items(key_items.as_slice());
831    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
832        if key_items_are_field_path_only(key_items.as_slice())
833            && statement.if_not_exists
834            && existing_field_path_index_matches_request(
835                existing_index,
836                field_paths.as_slice(),
837                statement.predicate_sql.as_deref(),
838                statement.uniqueness,
839            )
840        {
841            return Ok(BoundSqlDdlRequest {
842                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
843                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
844                    index_name: statement.name.clone(),
845                    entity_name: entity_name.to_string(),
846                    target_store: existing_index.store().to_string(),
847                    field_path: ddl_field_path_report(field_paths.as_slice()),
848                }),
849            });
850        }
851
852        return Err(SqlDdlBindError::DuplicateIndexName {
853            index_name: statement.name.clone(),
854        });
855    }
856    let predicate_sql =
857        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
858    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
859        if statement.if_not_exists
860            && existing_expression_index_matches_request(
861                existing_index,
862                key_items.as_slice(),
863                predicate_sql.as_deref(),
864                statement.uniqueness,
865            )
866        {
867            return Ok(BoundSqlDdlRequest {
868                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
869                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
870                    index_name: statement.name.clone(),
871                    entity_name: entity_name.to_string(),
872                    target_store: existing_index.store().to_string(),
873                    field_path: ddl_key_item_report(key_items.as_slice()),
874                }),
875            });
876        }
877
878        return Err(SqlDdlBindError::DuplicateIndexName {
879            index_name: statement.name.clone(),
880        });
881    }
882    if key_items_are_field_path_only(key_items.as_slice()) {
883        reject_duplicate_field_path_index(
884            field_paths.as_slice(),
885            predicate_sql.as_deref(),
886            schema,
887        )?;
888    } else {
889        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
890    }
891    let candidate_index = candidate_index_snapshot(
892        statement.name.as_str(),
893        key_items.as_slice(),
894        predicate_sql.as_deref(),
895        statement.uniqueness,
896        schema,
897        index_store_path,
898    )?;
899
900    Ok(BoundSqlDdlRequest {
901        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
902            index_name: statement.name.clone(),
903            entity_name: entity_name.to_string(),
904            key_items,
905            field_paths,
906            candidate_index,
907        }),
908    })
909}
910
911fn bind_drop_index_statement(
912    statement: &SqlDropIndexStatement,
913    accepted_before: &AcceptedSchemaSnapshot,
914    schema: &SchemaInfo,
915) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
916    let entity_name = schema
917        .entity_name()
918        .ok_or(SqlDdlBindError::MissingEntityName)?;
919
920    if let Some(sql_entity) = statement.entity.as_deref()
921        && !identifiers_tail_match(sql_entity, entity_name)
922    {
923        return Err(SqlDdlBindError::EntityMismatch {
924            sql_entity: sql_entity.to_string(),
925            expected_entity: entity_name.to_string(),
926        });
927    }
928    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
929        accepted_before,
930        &statement.name,
931    )
932    .map_err(|error| match error {
933        SchemaDdlIndexDropCandidateError::Generated => {
934            SqlDdlBindError::GeneratedIndexDropRejected {
935                index_name: statement.name.clone(),
936            }
937        }
938        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
939            entity_name: entity_name.to_string(),
940            index_name: statement.name.clone(),
941        },
942        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
943            index_name: statement.name.clone(),
944        },
945    });
946    let (dropped_index, field_path) = match drop_candidate {
947        Ok((dropped_index, field_path)) => (dropped_index, field_path),
948        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
949            return Ok(BoundSqlDdlRequest {
950                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
951                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
952                    index_name: statement.name.clone(),
953                    entity_name: entity_name.to_string(),
954                    target_store: "-".to_string(),
955                    field_path: Vec::new(),
956                }),
957            });
958        }
959        Err(error) => return Err(error),
960    };
961    Ok(BoundSqlDdlRequest {
962        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
963            index_name: statement.name.clone(),
964            entity_name: entity_name.to_string(),
965            dropped_index,
966            field_path,
967        }),
968    })
969}
970
971fn bind_alter_table_add_column_statement(
972    statement: &SqlAlterTableAddColumnStatement,
973    accepted_before: &AcceptedSchemaSnapshot,
974    schema: &SchemaInfo,
975) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
976    let entity_name = schema
977        .entity_name()
978        .ok_or(SqlDdlBindError::MissingEntityName)?;
979
980    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
981        return Err(SqlDdlBindError::EntityMismatch {
982            sql_entity: statement.entity.clone(),
983            expected_entity: entity_name.to_string(),
984        });
985    }
986
987    if schema
988        .field_nullable(statement.column_name.as_str())
989        .is_some()
990    {
991        return Err(SqlDdlBindError::DuplicateColumn {
992            entity_name: entity_name.to_string(),
993            column_name: statement.column_name.clone(),
994        });
995    }
996
997    let (kind, storage_decode, leaf_codec) = persisted_field_contract_for_sql_column_type(
998        statement.column_type.as_str(),
999    )
1000    .ok_or_else(|| SqlDdlBindError::UnsupportedAlterTableAddColumnType {
1001        entity_name: entity_name.to_string(),
1002        column_name: statement.column_name.clone(),
1003        column_type: statement.column_type.clone(),
1004    })?;
1005    let default = schema_field_default_for_sql_default(
1006        entity_name,
1007        statement.column_name.as_str(),
1008        statement.default.as_ref(),
1009        &kind,
1010        statement.nullable,
1011        storage_decode,
1012        leaf_codec,
1013    )?;
1014    if !statement.nullable && default.is_none() {
1015        return Err(SqlDdlBindError::UnsupportedAlterTableAddColumnNotNull {
1016            entity_name: entity_name.to_string(),
1017            column_name: statement.column_name.clone(),
1018        });
1019    }
1020    let field = PersistedFieldSnapshot::new_with_write_policy_and_origin(
1021        next_sql_ddl_field_id(accepted_before),
1022        statement.column_name.clone(),
1023        next_sql_ddl_field_slot(accepted_before),
1024        kind,
1025        Vec::new(),
1026        statement.nullable,
1027        default,
1028        SchemaFieldWritePolicy::from_model_policies(None, None),
1029        PersistedFieldOrigin::SqlDdl,
1030        storage_decode,
1031        leaf_codec,
1032    );
1033
1034    Ok(BoundSqlDdlRequest {
1035        statement: BoundSqlDdlStatement::AddColumn(BoundSqlAddColumnRequest {
1036            entity_name: entity_name.to_string(),
1037            field,
1038        }),
1039    })
1040}
1041
1042fn alter_table_alter_column_bind_error(
1043    statement: &SqlAlterTableAlterColumnStatement,
1044    schema: &SchemaInfo,
1045) -> SqlDdlBindError {
1046    let Some(entity_name) = schema.entity_name() else {
1047        return SqlDdlBindError::MissingEntityName;
1048    };
1049
1050    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1051        return SqlDdlBindError::EntityMismatch {
1052            sql_entity: statement.entity.clone(),
1053            expected_entity: entity_name.to_string(),
1054        };
1055    }
1056
1057    if schema
1058        .field_nullable(statement.column_name.as_str())
1059        .is_none()
1060    {
1061        return SqlDdlBindError::UnknownColumn {
1062            entity_name: entity_name.to_string(),
1063            column_name: statement.column_name.clone(),
1064        };
1065    }
1066
1067    SqlDdlBindError::UnsupportedAlterTableAlterColumn {
1068        entity_name: entity_name.to_string(),
1069        column_name: statement.column_name.clone(),
1070        action: statement.action.label().to_string(),
1071    }
1072}
1073
1074fn bind_alter_table_alter_column_statement(
1075    statement: &SqlAlterTableAlterColumnStatement,
1076    accepted_before: &AcceptedSchemaSnapshot,
1077    schema: &SchemaInfo,
1078) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1079    let Some(entity_name) = schema.entity_name() else {
1080        return Err(SqlDdlBindError::MissingEntityName);
1081    };
1082
1083    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1084        return Err(SqlDdlBindError::EntityMismatch {
1085            sql_entity: statement.entity.clone(),
1086            expected_entity: entity_name.to_string(),
1087        });
1088    }
1089
1090    let field = accepted_before
1091        .persisted_snapshot()
1092        .fields()
1093        .iter()
1094        .find(|field| field.name() == statement.column_name)
1095        .ok_or_else(|| SqlDdlBindError::UnknownColumn {
1096            entity_name: entity_name.to_string(),
1097            column_name: statement.column_name.clone(),
1098        })?;
1099
1100    match &statement.action {
1101        SqlAlterColumnAction::SetDefault(default) => {
1102            reject_generated_field_default_change(entity_name, field)?;
1103            let default =
1104                schema_field_default_for_alter_column_default(entity_name, field, default)?;
1105            Ok(bind_alter_table_alter_column_default(
1106                entity_name,
1107                field,
1108                default,
1109                SqlDdlMutationKind::SetFieldDefault,
1110            ))
1111        }
1112        SqlAlterColumnAction::DropDefault => {
1113            if !field.default().is_none() {
1114                reject_generated_field_default_change(entity_name, field)?;
1115            }
1116            if !field.nullable() && !field.default().is_none() {
1117                return Err(SqlDdlBindError::UnsupportedAlterTableDropDefaultRequired {
1118                    entity_name: entity_name.to_string(),
1119                    column_name: statement.column_name.clone(),
1120                });
1121            }
1122            Ok(bind_alter_table_alter_column_default(
1123                entity_name,
1124                field,
1125                SchemaFieldDefault::None,
1126                SqlDdlMutationKind::DropFieldDefault,
1127            ))
1128        }
1129        SqlAlterColumnAction::SetNotNull => Ok(bind_alter_table_alter_column_nullability(
1130            entity_name,
1131            field,
1132            false,
1133            SqlDdlMutationKind::SetFieldNotNull,
1134        )?),
1135        SqlAlterColumnAction::DropNotNull => Ok(bind_alter_table_alter_column_nullability(
1136            entity_name,
1137            field,
1138            true,
1139            SqlDdlMutationKind::DropFieldNotNull,
1140        )?),
1141    }
1142}
1143
1144fn bind_alter_table_drop_column_statement(
1145    statement: &SqlAlterTableDropColumnStatement,
1146    accepted_before: &AcceptedSchemaSnapshot,
1147    schema: &SchemaInfo,
1148) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1149    let entity_name = schema
1150        .entity_name()
1151        .ok_or(SqlDdlBindError::MissingEntityName)?;
1152
1153    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
1154        return Err(SqlDdlBindError::EntityMismatch {
1155            sql_entity: statement.entity.clone(),
1156            expected_entity: entity_name.to_string(),
1157        });
1158    }
1159
1160    let accepted = accepted_before.persisted_snapshot();
1161    let Some(field) = accepted
1162        .fields()
1163        .iter()
1164        .find(|field| field.name() == statement.column_name)
1165    else {
1166        if statement.if_exists {
1167            return Ok(BoundSqlDdlRequest {
1168                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1169                    mutation_kind: SqlDdlMutationKind::DropField,
1170                    index_name: statement.column_name.clone(),
1171                    entity_name: entity_name.to_string(),
1172                    target_store: "-".to_string(),
1173                    field_path: vec![statement.column_name.clone()],
1174                }),
1175            });
1176        }
1177
1178        return Err(SqlDdlBindError::UnknownColumn {
1179            entity_name: entity_name.to_string(),
1180            column_name: statement.column_name.clone(),
1181        });
1182    };
1183
1184    if accepted.primary_key_field_id() == field.id() {
1185        return Err(SqlDdlBindError::PrimaryKeyFieldDropRejected {
1186            entity_name: entity_name.to_string(),
1187            column_name: statement.column_name.clone(),
1188        });
1189    }
1190
1191    if field.generated() {
1192        return Err(SqlDdlBindError::GeneratedFieldDropRejected {
1193            entity_name: entity_name.to_string(),
1194            column_name: statement.column_name.clone(),
1195        });
1196    }
1197
1198    if let Some(index_name) =
1199        resolve_sql_ddl_field_drop_dependent_index(accepted_before, field.id())
1200    {
1201        return Err(SqlDdlBindError::IndexedFieldDropRejected {
1202            entity_name: entity_name.to_string(),
1203            column_name: statement.column_name.clone(),
1204            index_name,
1205        });
1206    }
1207
1208    Err(SqlDdlBindError::UnsupportedAlterTableDropColumn {
1209        entity_name: entity_name.to_string(),
1210        column_name: statement.column_name.clone(),
1211    })
1212}
1213
1214fn bind_alter_table_alter_column_default(
1215    entity_name: &str,
1216    field: &PersistedFieldSnapshot,
1217    default: SchemaFieldDefault,
1218    mutation_kind: SqlDdlMutationKind,
1219) -> BoundSqlDdlRequest {
1220    if field.default() == &default {
1221        return BoundSqlDdlRequest {
1222            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1223                mutation_kind,
1224                index_name: field.name().to_string(),
1225                entity_name: entity_name.to_string(),
1226                target_store: entity_name.to_string(),
1227                field_path: vec![field.name().to_string()],
1228            }),
1229        };
1230    }
1231
1232    BoundSqlDdlRequest {
1233        statement: BoundSqlDdlStatement::AlterColumnDefault(BoundSqlAlterColumnDefaultRequest {
1234            entity_name: entity_name.to_string(),
1235            field: field.clone(),
1236            default,
1237            mutation_kind,
1238        }),
1239    }
1240}
1241
1242fn reject_generated_field_default_change(
1243    entity_name: &str,
1244    field: &PersistedFieldSnapshot,
1245) -> Result<(), SqlDdlBindError> {
1246    if field.generated() {
1247        return Err(SqlDdlBindError::GeneratedFieldDefaultChangeRejected {
1248            entity_name: entity_name.to_string(),
1249            column_name: field.name().to_string(),
1250        });
1251    }
1252
1253    Ok(())
1254}
1255
1256fn bind_alter_table_alter_column_nullability(
1257    entity_name: &str,
1258    field: &PersistedFieldSnapshot,
1259    nullable: bool,
1260    mutation_kind: SqlDdlMutationKind,
1261) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
1262    if field.nullable() == nullable {
1263        return Ok(BoundSqlDdlRequest {
1264            statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
1265                mutation_kind,
1266                index_name: field.name().to_string(),
1267                entity_name: entity_name.to_string(),
1268                target_store: entity_name.to_string(),
1269                field_path: vec![field.name().to_string()],
1270            }),
1271        });
1272    }
1273
1274    reject_generated_field_nullability_change(entity_name, field)?;
1275
1276    Ok(BoundSqlDdlRequest {
1277        statement: BoundSqlDdlStatement::AlterColumnNullability(
1278            BoundSqlAlterColumnNullabilityRequest {
1279                entity_name: entity_name.to_string(),
1280                field: field.clone(),
1281                nullable,
1282                mutation_kind,
1283            },
1284        ),
1285    })
1286}
1287
1288fn reject_generated_field_nullability_change(
1289    entity_name: &str,
1290    field: &PersistedFieldSnapshot,
1291) -> Result<(), SqlDdlBindError> {
1292    if field.generated() {
1293        return Err(SqlDdlBindError::GeneratedFieldNullabilityChangeRejected {
1294            entity_name: entity_name.to_string(),
1295            column_name: field.name().to_string(),
1296        });
1297    }
1298
1299    Ok(())
1300}
1301
1302fn schema_field_default_for_sql_default(
1303    entity_name: &str,
1304    column_name: &str,
1305    default: Option<&crate::value::Value>,
1306    kind: &PersistedFieldKind,
1307    nullable: bool,
1308    storage_decode: FieldStorageDecode,
1309    leaf_codec: LeafCodec,
1310) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1311    let Some(default) = default else {
1312        return Ok(SchemaFieldDefault::None);
1313    };
1314    if matches!(default, crate::value::Value::Null) {
1315        return Err(SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1316            entity_name: entity_name.to_string(),
1317            column_name: column_name.to_string(),
1318            detail: "NULL cannot be used as an accepted database default".to_string(),
1319        });
1320    }
1321
1322    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(kind, default)
1323        .unwrap_or_else(|| default.clone());
1324    let contract =
1325        AcceptedFieldDecodeContract::new(column_name, kind, nullable, storage_decode, leaf_codec);
1326    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1327        |error| SqlDdlBindError::InvalidAlterTableAddColumnDefault {
1328            entity_name: entity_name.to_string(),
1329            column_name: column_name.to_string(),
1330            detail: error.to_string(),
1331        },
1332    )?;
1333
1334    Ok(SchemaFieldDefault::SlotPayload(payload))
1335}
1336
1337fn schema_field_default_for_alter_column_default(
1338    entity_name: &str,
1339    field: &PersistedFieldSnapshot,
1340    default: &crate::value::Value,
1341) -> Result<SchemaFieldDefault, SqlDdlBindError> {
1342    if matches!(default, crate::value::Value::Null) {
1343        return Err(SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1344            entity_name: entity_name.to_string(),
1345            column_name: field.name().to_string(),
1346            detail: "NULL cannot be used as an accepted database default".to_string(),
1347        });
1348    }
1349
1350    let normalized = canonicalize_strict_sql_literal_for_persisted_kind(field.kind(), default)
1351        .unwrap_or_else(|| default.clone());
1352    let contract = AcceptedFieldDecodeContract::new(
1353        field.name(),
1354        field.kind(),
1355        field.nullable(),
1356        field.storage_decode(),
1357        field.leaf_codec(),
1358    );
1359    let payload = encode_runtime_value_for_accepted_field_contract(contract, &normalized).map_err(
1360        |error| SqlDdlBindError::InvalidAlterTableAlterColumnDefault {
1361            entity_name: entity_name.to_string(),
1362            column_name: field.name().to_string(),
1363            detail: error.to_string(),
1364        },
1365    )?;
1366
1367    Ok(SchemaFieldDefault::SlotPayload(payload))
1368}
1369
1370fn next_sql_ddl_field_id(accepted_before: &AcceptedSchemaSnapshot) -> FieldId {
1371    let next = accepted_before
1372        .persisted_snapshot()
1373        .fields()
1374        .iter()
1375        .map(|field| field.id().get())
1376        .max()
1377        .unwrap_or(0)
1378        .checked_add(1)
1379        .expect("accepted field IDs should not be exhausted");
1380
1381    FieldId::new(next)
1382}
1383
1384fn next_sql_ddl_field_slot(accepted_before: &AcceptedSchemaSnapshot) -> SchemaFieldSlot {
1385    let next = accepted_before
1386        .persisted_snapshot()
1387        .row_layout()
1388        .field_to_slot()
1389        .iter()
1390        .map(|(_, slot)| slot.get())
1391        .max()
1392        .unwrap_or(0)
1393        .checked_add(1)
1394        .expect("accepted row slots should not be exhausted");
1395
1396    SchemaFieldSlot::new(next)
1397}
1398
1399fn persisted_field_contract_for_sql_column_type(
1400    column_type: &str,
1401) -> Option<(PersistedFieldKind, FieldStorageDecode, LeafCodec)> {
1402    let normalized = column_type.trim().to_ascii_lowercase();
1403    match normalized.as_str() {
1404        "bool" | "boolean" => Some((
1405            PersistedFieldKind::Bool,
1406            FieldStorageDecode::ByKind,
1407            LeafCodec::Scalar(ScalarCodec::Bool),
1408        )),
1409        "int" | "integer" => Some((
1410            PersistedFieldKind::Int,
1411            FieldStorageDecode::ByKind,
1412            LeafCodec::Scalar(ScalarCodec::Int64),
1413        )),
1414        "nat" | "natural" => Some((
1415            PersistedFieldKind::Nat,
1416            FieldStorageDecode::ByKind,
1417            LeafCodec::Scalar(ScalarCodec::Nat64),
1418        )),
1419        "text" | "string" => Some((
1420            PersistedFieldKind::Text { max_len: None },
1421            FieldStorageDecode::ByKind,
1422            LeafCodec::Scalar(ScalarCodec::Text),
1423        )),
1424        _ => None,
1425    }
1426}
1427
1428#[derive(Clone, Debug, Eq, PartialEq)]
1429pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
1430    FieldPath(BoundSqlDdlFieldPath),
1431    Expression(BoundSqlDdlExpressionKey),
1432}
1433
1434///
1435/// BoundSqlDdlExpressionKey
1436///
1437/// Accepted expression-index key target for SQL DDL binding.
1438///
1439#[derive(Clone, Debug, Eq, PartialEq)]
1440pub(in crate::db) struct BoundSqlDdlExpressionKey {
1441    op: PersistedIndexExpressionOp,
1442    source: BoundSqlDdlFieldPath,
1443    canonical_sql: String,
1444}
1445
1446impl BoundSqlDdlExpressionKey {
1447    /// Return the accepted expression operation.
1448    #[must_use]
1449    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
1450        self.op
1451    }
1452
1453    /// Borrow the accepted source field path.
1454    #[must_use]
1455    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
1456        &self.source
1457    }
1458
1459    /// Borrow the SQL-facing canonical expression text.
1460    #[must_use]
1461    pub(in crate::db) const fn canonical_sql(&self) -> &str {
1462        self.canonical_sql.as_str()
1463    }
1464}
1465
1466fn bind_create_index_key_item(
1467    key_item: &SqlCreateIndexKeyItem,
1468    entity_name: &str,
1469    schema: &SchemaInfo,
1470) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1471    match key_item {
1472        SqlCreateIndexKeyItem::FieldPath(field_path) => {
1473            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
1474                .map(BoundSqlDdlCreateIndexKey::FieldPath)
1475        }
1476        SqlCreateIndexKeyItem::Expression(expression) => {
1477            bind_create_index_expression_key(expression, entity_name, schema)
1478        }
1479    }
1480}
1481
1482fn bind_create_index_expression_key(
1483    expression: &SqlCreateIndexExpressionKey,
1484    entity_name: &str,
1485    schema: &SchemaInfo,
1486) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
1487    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
1488
1489    Ok(BoundSqlDdlCreateIndexKey::Expression(
1490        BoundSqlDdlExpressionKey {
1491            op: expression_op_from_sql_function(expression.function),
1492            source,
1493            canonical_sql: expression.canonical_sql(),
1494        },
1495    ))
1496}
1497
1498const fn expression_op_from_sql_function(
1499    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
1500) -> PersistedIndexExpressionOp {
1501    match function {
1502        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
1503            PersistedIndexExpressionOp::Lower
1504        }
1505        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
1506            PersistedIndexExpressionOp::Upper
1507        }
1508        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
1509            PersistedIndexExpressionOp::Trim
1510        }
1511    }
1512}
1513
1514fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
1515    key_items
1516        .iter()
1517        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
1518}
1519
1520fn create_index_field_path_report_items(
1521    key_items: &[BoundSqlDdlCreateIndexKey],
1522) -> Vec<BoundSqlDdlFieldPath> {
1523    key_items
1524        .iter()
1525        .map(|key_item| match key_item {
1526            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
1527            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
1528        })
1529        .collect()
1530}
1531
1532fn bind_create_index_field_path(
1533    field_path: &str,
1534    entity_name: &str,
1535    schema: &SchemaInfo,
1536) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
1537    let mut path = field_path
1538        .split('.')
1539        .map(str::trim)
1540        .filter(|segment| !segment.is_empty());
1541    let Some(root) = path.next() else {
1542        return Err(SqlDdlBindError::UnknownFieldPath {
1543            entity_name: entity_name.to_string(),
1544            field_path: field_path.to_string(),
1545        });
1546    };
1547    let segments = path.map(str::to_string).collect::<Vec<_>>();
1548
1549    let capabilities = if segments.is_empty() {
1550        schema.sql_capabilities(root)
1551    } else {
1552        schema.nested_sql_capabilities(root, segments.as_slice())
1553    }
1554    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
1555        entity_name: entity_name.to_string(),
1556        field_path: field_path.to_string(),
1557    })?;
1558
1559    if !capabilities.orderable() {
1560        return Err(SqlDdlBindError::FieldPathNotIndexable {
1561            field_path: field_path.to_string(),
1562        });
1563    }
1564
1565    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
1566    accepted_path.push(root.to_string());
1567    accepted_path.extend(segments.iter().cloned());
1568
1569    Ok(BoundSqlDdlFieldPath {
1570        root: root.to_string(),
1571        segments,
1572        accepted_path,
1573    })
1574}
1575
1576fn find_field_path_index_by_name<'a>(
1577    schema: &'a SchemaInfo,
1578    index_name: &str,
1579) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
1580    schema
1581        .field_path_indexes()
1582        .iter()
1583        .find(|index| index.name() == index_name)
1584}
1585
1586fn existing_field_path_index_matches_request(
1587    index: &crate::db::schema::SchemaIndexInfo,
1588    field_paths: &[BoundSqlDdlFieldPath],
1589    predicate_sql: Option<&str>,
1590    uniqueness: SqlCreateIndexUniqueness,
1591) -> bool {
1592    let fields = index.fields();
1593
1594    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1595        && index.predicate_sql() == predicate_sql
1596        && fields.len() == field_paths.len()
1597        && fields
1598            .iter()
1599            .zip(field_paths)
1600            .all(|(field, requested)| field.path() == requested.accepted_path())
1601}
1602
1603fn find_expression_index_by_name<'a>(
1604    schema: &'a SchemaInfo,
1605    index_name: &str,
1606) -> Option<&'a SchemaExpressionIndexInfo> {
1607    schema
1608        .expression_indexes()
1609        .iter()
1610        .find(|index| index.name() == index_name)
1611}
1612
1613fn existing_expression_index_matches_request(
1614    index: &SchemaExpressionIndexInfo,
1615    key_items: &[BoundSqlDdlCreateIndexKey],
1616    predicate_sql: Option<&str>,
1617    uniqueness: SqlCreateIndexUniqueness,
1618) -> bool {
1619    let existing_key_items = index.key_items();
1620
1621    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
1622        && index.predicate_sql() == predicate_sql
1623        && existing_key_items.len() == key_items.len()
1624        && existing_key_items
1625            .iter()
1626            .zip(key_items)
1627            .all(existing_expression_key_item_matches_request)
1628}
1629
1630fn existing_expression_key_item_matches_request(
1631    existing: (
1632        &SchemaExpressionIndexKeyItemInfo,
1633        &BoundSqlDdlCreateIndexKey,
1634    ),
1635) -> bool {
1636    let (existing, requested) = existing;
1637    match (existing, requested) {
1638        (
1639            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
1640            BoundSqlDdlCreateIndexKey::FieldPath(requested),
1641        ) => existing.path() == requested.accepted_path(),
1642        (
1643            SchemaExpressionIndexKeyItemInfo::Expression(existing),
1644            BoundSqlDdlCreateIndexKey::Expression(requested),
1645        ) => existing_expression_component_matches_request(
1646            existing.op(),
1647            existing.source().path(),
1648            existing.canonical_text(),
1649            requested,
1650        ),
1651        _ => false,
1652    }
1653}
1654
1655fn existing_expression_component_matches_request(
1656    existing_op: PersistedIndexExpressionOp,
1657    existing_path: &[String],
1658    existing_canonical_text: &str,
1659    requested: &BoundSqlDdlExpressionKey,
1660) -> bool {
1661    let requested_path = requested.source().accepted_path();
1662    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
1663
1664    existing_op == requested.op()
1665        && existing_path == requested_path
1666        && existing_canonical_text == requested_canonical_text
1667}
1668
1669fn reject_duplicate_expression_index(
1670    key_items: &[BoundSqlDdlCreateIndexKey],
1671    predicate_sql: Option<&str>,
1672    schema: &SchemaInfo,
1673) -> Result<(), SqlDdlBindError> {
1674    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
1675        existing_expression_index_matches_request(
1676            index,
1677            key_items,
1678            predicate_sql,
1679            if index.unique() {
1680                SqlCreateIndexUniqueness::Unique
1681            } else {
1682                SqlCreateIndexUniqueness::NonUnique
1683            },
1684        )
1685    }) else {
1686        return Ok(());
1687    };
1688
1689    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1690        field_path: ddl_key_item_report(key_items).join(","),
1691        existing_index: existing_index.name().to_string(),
1692    })
1693}
1694
1695fn reject_duplicate_field_path_index(
1696    field_paths: &[BoundSqlDdlFieldPath],
1697    predicate_sql: Option<&str>,
1698    schema: &SchemaInfo,
1699) -> Result<(), SqlDdlBindError> {
1700    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1701        let fields = index.fields();
1702        index.predicate_sql() == predicate_sql
1703            && fields.len() == field_paths.len()
1704            && fields
1705                .iter()
1706                .zip(field_paths)
1707                .all(|(field, requested)| field.path() == requested.accepted_path())
1708    }) else {
1709        return Ok(());
1710    };
1711
1712    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1713        field_path: ddl_field_path_report(field_paths).join(","),
1714        existing_index: existing_index.name().to_string(),
1715    })
1716}
1717
1718fn candidate_index_snapshot(
1719    index_name: &str,
1720    key_items: &[BoundSqlDdlCreateIndexKey],
1721    predicate_sql: Option<&str>,
1722    uniqueness: SqlCreateIndexUniqueness,
1723    schema: &SchemaInfo,
1724    index_store_path: &'static str,
1725) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1726    let key = if key_items_are_field_path_only(key_items) {
1727        PersistedIndexKeySnapshot::FieldPath(
1728            key_items
1729                .iter()
1730                .map(|key_item| {
1731                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1732                        unreachable!("field-path-only index checked before field-path lowering");
1733                    };
1734
1735                    accepted_index_field_path_snapshot(schema, field_path)
1736                })
1737                .collect::<Result<Vec<_>, _>>()?,
1738        )
1739    } else {
1740        PersistedIndexKeySnapshot::Items(
1741            key_items
1742                .iter()
1743                .map(|key_item| match key_item {
1744                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1745                        accepted_index_field_path_snapshot(schema, field_path)
1746                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1747                    }
1748                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1749                        accepted_index_expression_snapshot(schema, expression)
1750                    }
1751                })
1752                .collect::<Result<Vec<_>, _>>()?,
1753        )
1754    };
1755
1756    Ok(PersistedIndexSnapshot::new_sql_ddl(
1757        schema.next_secondary_index_ordinal(),
1758        index_name.to_string(),
1759        index_store_path.to_string(),
1760        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1761        key,
1762        predicate_sql.map(str::to_string),
1763    ))
1764}
1765
1766fn accepted_index_field_path_snapshot(
1767    schema: &SchemaInfo,
1768    field_path: &BoundSqlDdlFieldPath,
1769) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1770    schema
1771        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1772        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1773            field_path: field_path.accepted_path().join("."),
1774        })
1775}
1776
1777fn accepted_index_expression_snapshot(
1778    schema: &SchemaInfo,
1779    expression: &BoundSqlDdlExpressionKey,
1780) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1781    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1782    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1783        return Err(SqlDdlBindError::FieldPathNotIndexable {
1784            field_path: expression.source().accepted_path().join("."),
1785        });
1786    };
1787
1788    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1789        PersistedIndexExpressionSnapshot::new(
1790            expression.op(),
1791            source.clone(),
1792            source.kind().clone(),
1793            output_kind,
1794            format!("expr:v1:{}", expression.canonical_sql()),
1795        ),
1796    )))
1797}
1798
1799fn expression_output_kind(
1800    op: PersistedIndexExpressionOp,
1801    source_kind: &PersistedFieldKind,
1802) -> Option<PersistedFieldKind> {
1803    match op {
1804        PersistedIndexExpressionOp::Lower
1805        | PersistedIndexExpressionOp::Upper
1806        | PersistedIndexExpressionOp::Trim
1807        | PersistedIndexExpressionOp::LowerTrim => {
1808            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1809                Some(source_kind.clone())
1810            } else {
1811                None
1812            }
1813        }
1814        PersistedIndexExpressionOp::Date => {
1815            if matches!(
1816                source_kind,
1817                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1818            ) {
1819                Some(PersistedFieldKind::Date)
1820            } else {
1821                None
1822            }
1823        }
1824        PersistedIndexExpressionOp::Year
1825        | PersistedIndexExpressionOp::Month
1826        | PersistedIndexExpressionOp::Day => {
1827            if matches!(
1828                source_kind,
1829                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1830            ) {
1831                Some(PersistedFieldKind::Int)
1832            } else {
1833                None
1834            }
1835        }
1836    }
1837}
1838
1839fn validated_create_index_predicate_sql(
1840    predicate_sql: Option<&str>,
1841    schema: &SchemaInfo,
1842) -> Result<Option<String>, SqlDdlBindError> {
1843    let Some(predicate_sql) = predicate_sql else {
1844        return Ok(None);
1845    };
1846    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1847        SqlDdlBindError::InvalidFilteredIndexPredicate {
1848            detail: error.to_string(),
1849        }
1850    })?;
1851    validate_predicate(schema, &predicate).map_err(|error| {
1852        SqlDdlBindError::InvalidFilteredIndexPredicate {
1853            detail: error.to_string(),
1854        }
1855    })?;
1856
1857    Ok(Some(predicate_sql.to_string()))
1858}
1859
1860fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1861    match field_paths {
1862        [field_path] => field_path.accepted_path().to_vec(),
1863        _ => vec![
1864            field_paths
1865                .iter()
1866                .map(|field_path| field_path.accepted_path().join("."))
1867                .collect::<Vec<_>>()
1868                .join(","),
1869        ],
1870    }
1871}
1872
1873fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1874    match key_items {
1875        [key_item] => vec![ddl_key_item_text(key_item)],
1876        _ => vec![
1877            key_items
1878                .iter()
1879                .map(ddl_key_item_text)
1880                .collect::<Vec<_>>()
1881                .join(","),
1882        ],
1883    }
1884}
1885
1886fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1887    match key_item {
1888        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1889        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1890    }
1891}
1892
1893/// Lower one bound SQL DDL request through schema mutation admission.
1894pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1895    request: &BoundSqlDdlRequest,
1896) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1897    match request.statement() {
1898        BoundSqlDdlStatement::AddColumn(add) => {
1899            Ok(admit_sql_ddl_field_addition_candidate(add.field()))
1900        }
1901        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1902            Ok(admit_sql_ddl_field_default_candidate(alter.field()))
1903        }
1904        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
1905            Ok(admit_sql_ddl_field_nullability_candidate(alter.field()))
1906        }
1907        BoundSqlDdlStatement::CreateIndex(create) => {
1908            if create.candidate_index().key().is_field_path_only() {
1909                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1910            } else {
1911                admit_sql_ddl_expression_index_candidate(create.candidate_index())
1912            }
1913        }
1914        BoundSqlDdlStatement::DropIndex(drop) => {
1915            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
1916        }
1917        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1918    }
1919    .map_err(SqlDdlLoweringError::MutationAdmission)
1920}
1921
1922/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
1923pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
1924    accepted_before: &AcceptedSchemaSnapshot,
1925    request: &BoundSqlDdlRequest,
1926) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
1927    match request.statement() {
1928        BoundSqlDdlStatement::AddColumn(add) => {
1929            derive_sql_ddl_field_addition_accepted_after(accepted_before, add.field().clone())
1930        }
1931        BoundSqlDdlStatement::AlterColumnDefault(alter) => {
1932            derive_sql_ddl_field_default_accepted_after(
1933                accepted_before,
1934                alter.field_name(),
1935                alter.default().clone(),
1936            )
1937        }
1938        BoundSqlDdlStatement::AlterColumnNullability(alter) => {
1939            derive_sql_ddl_field_nullability_accepted_after(
1940                accepted_before,
1941                alter.field_name(),
1942                alter.nullable(),
1943            )
1944        }
1945        BoundSqlDdlStatement::CreateIndex(create) => {
1946            if create.candidate_index().key().is_field_path_only() {
1947                derive_sql_ddl_field_path_index_accepted_after(
1948                    accepted_before,
1949                    create.candidate_index().clone(),
1950                )
1951            } else {
1952                derive_sql_ddl_expression_index_accepted_after(
1953                    accepted_before,
1954                    create.candidate_index().clone(),
1955                )
1956            }
1957        }
1958        BoundSqlDdlStatement::DropIndex(drop) => {
1959            derive_sql_ddl_secondary_index_drop_accepted_after(
1960                accepted_before,
1961                drop.dropped_index(),
1962            )
1963        }
1964        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1965    }
1966    .map_err(SqlDdlLoweringError::MutationAdmission)
1967}
1968
1969fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
1970    match bound.statement() {
1971        BoundSqlDdlStatement::AddColumn(add) => SqlDdlPreparationReport {
1972            mutation_kind: if add.field().default().is_none() {
1973                SqlDdlMutationKind::AddNullableField
1974            } else {
1975                SqlDdlMutationKind::AddDefaultedField
1976            },
1977            target_index: add.field().name().to_string(),
1978            target_store: add.entity_name().to_string(),
1979            field_path: vec![add.field().name().to_string()],
1980            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1981            rows_scanned: 0,
1982            index_keys_written: 0,
1983        },
1984        BoundSqlDdlStatement::AlterColumnDefault(alter) => SqlDdlPreparationReport {
1985            mutation_kind: alter.mutation_kind(),
1986            target_index: alter.field_name().to_string(),
1987            target_store: alter.entity_name().to_string(),
1988            field_path: vec![alter.field_name().to_string()],
1989            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1990            rows_scanned: 0,
1991            index_keys_written: 0,
1992        },
1993        BoundSqlDdlStatement::AlterColumnNullability(alter) => SqlDdlPreparationReport {
1994            mutation_kind: alter.mutation_kind(),
1995            target_index: alter.field_name().to_string(),
1996            target_store: alter.entity_name().to_string(),
1997            field_path: vec![alter.field_name().to_string()],
1998            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1999            rows_scanned: 0,
2000            index_keys_written: 0,
2001        },
2002        BoundSqlDdlStatement::CreateIndex(create) => {
2003            let target = create.candidate_index();
2004
2005            SqlDdlPreparationReport {
2006                mutation_kind: if target.key().is_field_path_only() {
2007                    SqlDdlMutationKind::AddFieldPathIndex
2008                } else {
2009                    SqlDdlMutationKind::AddExpressionIndex
2010                },
2011                target_index: target.name().to_string(),
2012                target_store: target.store().to_string(),
2013                field_path: ddl_key_item_report(create.key_items()),
2014                execution_status: SqlDdlExecutionStatus::PreparedOnly,
2015                rows_scanned: 0,
2016                index_keys_written: 0,
2017            }
2018        }
2019        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
2020            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
2021            target_index: drop.index_name().to_string(),
2022            target_store: drop.dropped_index().store().to_string(),
2023            field_path: drop.field_path().to_vec(),
2024            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2025            rows_scanned: 0,
2026            index_keys_written: 0,
2027        },
2028        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
2029            mutation_kind: no_op.mutation_kind(),
2030            target_index: no_op.index_name().to_string(),
2031            target_store: no_op.target_store().to_string(),
2032            field_path: no_op.field_path().to_vec(),
2033            execution_status: SqlDdlExecutionStatus::PreparedOnly,
2034            rows_scanned: 0,
2035            index_keys_written: 0,
2036        },
2037    }
2038}