Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    predicate::parse_sql_predicate,
13    query::predicate::validate_predicate,
14    schema::{
15        AcceptedSchemaSnapshot, PersistedIndexKeySnapshot, PersistedIndexSnapshot,
16        SchemaDdlAcceptedSnapshotDerivation, SchemaDdlIndexDropCandidateError,
17        SchemaDdlMutationAdmission, SchemaDdlMutationAdmissionError, SchemaInfo,
18        admit_sql_ddl_field_path_index_candidate, admit_sql_ddl_secondary_index_drop_candidate,
19        derive_sql_ddl_field_path_index_accepted_after,
20        derive_sql_ddl_secondary_index_drop_accepted_after,
21        resolve_sql_ddl_secondary_index_drop_candidate,
22    },
23    sql::{
24        identifier::identifiers_tail_match,
25        parser::{
26            SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem, SqlCreateIndexStatement,
27            SqlCreateIndexUniqueness, SqlDdlStatement, SqlDropIndexStatement, SqlStatement,
28        },
29    },
30};
31use thiserror::Error as ThisError;
32
33///
34/// PreparedSqlDdlCommand
35///
36/// Fully prepared SQL DDL command. This is intentionally not executable yet:
37/// it packages the accepted-catalog binding, accepted-after derivation, and
38/// schema mutation admission proof for the future execution boundary.
39///
40#[derive(Clone, Debug, Eq, PartialEq)]
41pub(in crate::db) struct PreparedSqlDdlCommand {
42    bound: BoundSqlDdlRequest,
43    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
44    report: SqlDdlPreparationReport,
45}
46
47impl PreparedSqlDdlCommand {
48    /// Borrow the accepted-catalog-bound DDL request.
49    #[must_use]
50    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
51        &self.bound
52    }
53
54    /// Borrow the accepted-after derivation proof.
55    #[must_use]
56    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
57        self.derivation.as_ref()
58    }
59
60    /// Borrow the developer-facing preparation report.
61    #[must_use]
62    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
63        &self.report
64    }
65
66    /// Return whether this prepared command needs schema or storage mutation.
67    #[must_use]
68    pub(in crate::db) const fn mutates_schema(&self) -> bool {
69        self.derivation.is_some()
70    }
71}
72
73///
74/// SqlDdlPreparationReport
75///
76/// Compact report for a DDL command that has passed all pre-execution
77/// frontend and schema-mutation checks.
78///
79#[derive(Clone, Debug, Eq, PartialEq)]
80pub struct SqlDdlPreparationReport {
81    mutation_kind: SqlDdlMutationKind,
82    target_index: String,
83    target_store: String,
84    field_path: Vec<String>,
85    execution_status: SqlDdlExecutionStatus,
86    rows_scanned: usize,
87    index_keys_written: usize,
88}
89
90impl SqlDdlPreparationReport {
91    /// Return the prepared DDL mutation kind.
92    #[must_use]
93    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
94        self.mutation_kind
95    }
96
97    /// Borrow the target accepted index name.
98    #[must_use]
99    pub const fn target_index(&self) -> &str {
100        self.target_index.as_str()
101    }
102
103    /// Borrow the target accepted index store path.
104    #[must_use]
105    pub const fn target_store(&self) -> &str {
106        self.target_store.as_str()
107    }
108
109    /// Borrow the target field path.
110    #[must_use]
111    pub const fn field_path(&self) -> &[String] {
112        self.field_path.as_slice()
113    }
114
115    /// Return the execution status captured by this DDL report.
116    #[must_use]
117    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
118        self.execution_status
119    }
120
121    /// Return rows scanned by DDL execution.
122    #[must_use]
123    pub const fn rows_scanned(&self) -> usize {
124        self.rows_scanned
125    }
126
127    /// Return index keys written by DDL execution.
128    #[must_use]
129    pub const fn index_keys_written(&self) -> usize {
130        self.index_keys_written
131    }
132
133    pub(in crate::db) const fn with_execution_status(
134        mut self,
135        execution_status: SqlDdlExecutionStatus,
136    ) -> Self {
137        self.execution_status = execution_status;
138        self
139    }
140
141    pub(in crate::db) const fn with_execution_metrics(
142        mut self,
143        rows_scanned: usize,
144        index_keys_written: usize,
145    ) -> Self {
146        self.rows_scanned = rows_scanned;
147        self.index_keys_written = index_keys_written;
148        self
149    }
150}
151
152///
153/// SqlDdlMutationKind
154///
155/// Developer-facing SQL DDL mutation kind.
156///
157#[derive(Clone, Copy, Debug, Eq, PartialEq)]
158pub enum SqlDdlMutationKind {
159    AddFieldPathIndex,
160    DropSecondaryIndex,
161}
162
163impl SqlDdlMutationKind {
164    /// Return the stable diagnostic label for this DDL mutation kind.
165    #[must_use]
166    pub const fn as_str(self) -> &'static str {
167        match self {
168            Self::AddFieldPathIndex => "add_field_path_index",
169            Self::DropSecondaryIndex => "drop_secondary_index",
170        }
171    }
172}
173
174///
175/// SqlDdlExecutionStatus
176///
177/// SQL DDL execution state at the current boundary.
178///
179#[derive(Clone, Copy, Debug, Eq, PartialEq)]
180pub enum SqlDdlExecutionStatus {
181    PreparedOnly,
182    Published,
183    NoOp,
184}
185
186impl SqlDdlExecutionStatus {
187    /// Return the stable diagnostic label for this execution status.
188    #[must_use]
189    pub const fn as_str(self) -> &'static str {
190        match self {
191            Self::PreparedOnly => "prepared_only",
192            Self::Published => "published",
193            Self::NoOp => "no_op",
194        }
195    }
196}
197
198///
199/// BoundSqlDdlRequest
200///
201/// Accepted-catalog SQL DDL request after parser syntax has been resolved
202/// against one runtime schema snapshot.
203///
204#[derive(Clone, Debug, Eq, PartialEq)]
205pub(in crate::db) struct BoundSqlDdlRequest {
206    statement: BoundSqlDdlStatement,
207}
208
209impl BoundSqlDdlRequest {
210    /// Borrow the bound statement payload.
211    #[must_use]
212    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
213        &self.statement
214    }
215}
216
217///
218/// BoundSqlDdlStatement
219///
220/// Catalog-resolved DDL statement vocabulary.
221///
222#[derive(Clone, Debug, Eq, PartialEq)]
223pub(in crate::db) enum BoundSqlDdlStatement {
224    CreateIndex(BoundSqlCreateIndexRequest),
225    DropIndex(BoundSqlDropIndexRequest),
226    NoOp(BoundSqlDdlNoOpRequest),
227}
228
229///
230/// BoundSqlDdlNoOpRequest
231///
232/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
233///
234#[derive(Clone, Debug, Eq, PartialEq)]
235pub(in crate::db) struct BoundSqlDdlNoOpRequest {
236    mutation_kind: SqlDdlMutationKind,
237    index_name: String,
238    entity_name: String,
239    target_store: String,
240    field_path: Vec<String>,
241}
242
243impl BoundSqlDdlNoOpRequest {
244    /// Return the user-facing mutation family this no-op belongs to.
245    #[must_use]
246    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
247        self.mutation_kind
248    }
249
250    /// Borrow the requested index name.
251    #[must_use]
252    pub(in crate::db) const fn index_name(&self) -> &str {
253        self.index_name.as_str()
254    }
255
256    /// Borrow the accepted entity name that owns this request.
257    #[must_use]
258    pub(in crate::db) const fn entity_name(&self) -> &str {
259        self.entity_name.as_str()
260    }
261
262    /// Borrow the accepted index store path, or `-` when no target exists.
263    #[must_use]
264    pub(in crate::db) const fn target_store(&self) -> &str {
265        self.target_store.as_str()
266    }
267
268    /// Borrow the target field path, empty when no target exists.
269    #[must_use]
270    pub(in crate::db) const fn field_path(&self) -> &[String] {
271        self.field_path.as_slice()
272    }
273}
274
275///
276/// BoundSqlCreateIndexRequest
277///
278/// Catalog-resolved request for adding one field-path secondary index.
279///
280#[derive(Clone, Debug, Eq, PartialEq)]
281pub(in crate::db) struct BoundSqlCreateIndexRequest {
282    index_name: String,
283    entity_name: String,
284    field_paths: Vec<BoundSqlDdlFieldPath>,
285    candidate_index: PersistedIndexSnapshot,
286}
287
288impl BoundSqlCreateIndexRequest {
289    /// Borrow the requested index name.
290    #[must_use]
291    pub(in crate::db) const fn index_name(&self) -> &str {
292        self.index_name.as_str()
293    }
294
295    /// Borrow the accepted entity name that owns this request.
296    #[must_use]
297    pub(in crate::db) const fn entity_name(&self) -> &str {
298        self.entity_name.as_str()
299    }
300
301    /// Borrow the accepted field-path targets.
302    #[must_use]
303    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
304        self.field_paths.as_slice()
305    }
306
307    /// Borrow the candidate accepted index snapshot for mutation admission.
308    #[must_use]
309    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
310        &self.candidate_index
311    }
312}
313
314///
315/// BoundSqlDropIndexRequest
316///
317/// Catalog-resolved request for dropping one DDL-published secondary index.
318///
319#[derive(Clone, Debug, Eq, PartialEq)]
320pub(in crate::db) struct BoundSqlDropIndexRequest {
321    index_name: String,
322    entity_name: String,
323    dropped_index: PersistedIndexSnapshot,
324    field_path: Vec<String>,
325}
326
327impl BoundSqlDropIndexRequest {
328    /// Borrow the requested index name.
329    #[must_use]
330    pub(in crate::db) const fn index_name(&self) -> &str {
331        self.index_name.as_str()
332    }
333
334    /// Borrow the accepted entity name that owns this request.
335    #[must_use]
336    pub(in crate::db) const fn entity_name(&self) -> &str {
337        self.entity_name.as_str()
338    }
339
340    /// Borrow the accepted index snapshot that will be removed.
341    #[must_use]
342    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
343        &self.dropped_index
344    }
345
346    /// Borrow the dropped field-path target.
347    #[must_use]
348    pub(in crate::db) const fn field_path(&self) -> &[String] {
349        self.field_path.as_slice()
350    }
351}
352
353///
354/// BoundSqlDdlFieldPath
355///
356/// Accepted field-path target for SQL DDL binding.
357///
358#[derive(Clone, Debug, Eq, PartialEq)]
359pub(in crate::db) struct BoundSqlDdlFieldPath {
360    root: String,
361    segments: Vec<String>,
362    accepted_path: Vec<String>,
363}
364
365impl BoundSqlDdlFieldPath {
366    /// Borrow the top-level field name.
367    #[must_use]
368    pub(in crate::db) const fn root(&self) -> &str {
369        self.root.as_str()
370    }
371
372    /// Borrow nested path segments below the top-level field.
373    #[must_use]
374    pub(in crate::db) const fn segments(&self) -> &[String] {
375        self.segments.as_slice()
376    }
377
378    /// Borrow the full accepted field path used by index metadata.
379    #[must_use]
380    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
381        self.accepted_path.as_slice()
382    }
383}
384
385///
386/// SqlDdlBindError
387///
388/// Typed fail-closed reasons for SQL DDL catalog binding.
389///
390#[derive(Debug, Eq, PartialEq, ThisError)]
391pub(in crate::db) enum SqlDdlBindError {
392    #[error("SQL DDL binder requires a DDL statement")]
393    NotDdl,
394
395    #[error("accepted schema does not expose an entity name")]
396    MissingEntityName,
397
398    #[error("accepted schema does not expose an entity path")]
399    MissingEntityPath,
400
401    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
402    EntityMismatch {
403        sql_entity: String,
404        expected_entity: String,
405    },
406
407    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
408    UnknownFieldPath {
409        entity_name: String,
410        field_path: String,
411    },
412
413    #[error("field path '{field_path}' is not indexable")]
414    FieldPathNotIndexable { field_path: String },
415
416    #[error("field path '{field_path}' depends on generated-only metadata")]
417    FieldPathNotAcceptedCatalogBacked { field_path: String },
418
419    #[error("invalid filtered index predicate: {detail}")]
420    InvalidFilteredIndexPredicate { detail: String },
421
422    #[error("SQL DDL expression index keys are not executable in this release: {expression}")]
423    UnsupportedExpressionIndexKey { expression: String },
424
425    #[error("index name '{index_name}' already exists in the accepted schema")]
426    DuplicateIndexName { index_name: String },
427
428    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
429    DuplicateFieldPathIndex {
430        field_path: String,
431        existing_index: String,
432    },
433
434    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
435    UnknownIndex {
436        entity_name: String,
437        index_name: String,
438    },
439
440    #[error(
441        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
442    )]
443    GeneratedIndexDropRejected { index_name: String },
444
445    #[error(
446        "index '{index_name}' is not a supported DDL-droppable field-path index; SQL DDL can currently drop only field-path indexes created through SQL DDL"
447    )]
448    UnsupportedDropIndex { index_name: String },
449}
450
451///
452/// SqlDdlLoweringError
453///
454/// Typed fail-closed reasons while lowering bound DDL into schema mutation
455/// admission.
456///
457#[derive(Debug, Eq, PartialEq, ThisError)]
458pub(in crate::db) enum SqlDdlLoweringError {
459    #[error("SQL DDL lowering requires a supported DDL statement")]
460    UnsupportedStatement,
461
462    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
463    MutationAdmission(SchemaDdlMutationAdmissionError),
464}
465
466///
467/// SqlDdlPrepareError
468///
469/// Typed fail-closed preparation errors for SQL DDL.
470///
471#[derive(Debug, Eq, PartialEq, ThisError)]
472pub(in crate::db) enum SqlDdlPrepareError {
473    #[error("{0}")]
474    Bind(#[from] SqlDdlBindError),
475
476    #[error("{0}")]
477    Lowering(#[from] SqlDdlLoweringError),
478}
479
480/// Prepare one parsed SQL DDL statement through every pre-execution proof.
481pub(in crate::db) fn prepare_sql_ddl_statement(
482    statement: &SqlStatement,
483    accepted_before: &AcceptedSchemaSnapshot,
484    schema: &SchemaInfo,
485    index_store_path: &'static str,
486) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
487    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
488    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
489        None
490    } else {
491        Some(derive_bound_sql_ddl_accepted_after(
492            accepted_before,
493            &bound,
494        )?)
495    };
496    let report = ddl_preparation_report(&bound);
497
498    Ok(PreparedSqlDdlCommand {
499        bound,
500        derivation,
501        report,
502    })
503}
504
505/// Bind one parsed SQL DDL statement against accepted catalog metadata.
506pub(in crate::db) fn bind_sql_ddl_statement(
507    statement: &SqlStatement,
508    accepted_before: &AcceptedSchemaSnapshot,
509    schema: &SchemaInfo,
510    index_store_path: &'static str,
511) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
512    let SqlStatement::Ddl(ddl) = statement else {
513        return Err(SqlDdlBindError::NotDdl);
514    };
515
516    match ddl {
517        SqlDdlStatement::CreateIndex(statement) => {
518            bind_create_index_statement(statement, schema, index_store_path)
519        }
520        SqlDdlStatement::DropIndex(statement) => {
521            bind_drop_index_statement(statement, accepted_before, schema)
522        }
523    }
524}
525
526fn bind_create_index_statement(
527    statement: &SqlCreateIndexStatement,
528    schema: &SchemaInfo,
529    index_store_path: &'static str,
530) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
531    let entity_name = schema
532        .entity_name()
533        .ok_or(SqlDdlBindError::MissingEntityName)?;
534
535    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
536        return Err(SqlDdlBindError::EntityMismatch {
537            sql_entity: statement.entity.clone(),
538            expected_entity: entity_name.to_string(),
539        });
540    }
541
542    let key_items = statement
543        .key_items
544        .iter()
545        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
546        .collect::<Result<Vec<_>, _>>()?;
547    let field_paths = create_index_field_path_keys_only(key_items.as_slice())?;
548    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
549        if statement.if_not_exists
550            && existing_field_path_index_matches_request(
551                existing_index,
552                field_paths.as_slice(),
553                statement.predicate_sql.as_deref(),
554                statement.uniqueness,
555            )
556        {
557            return Ok(BoundSqlDdlRequest {
558                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
559                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
560                    index_name: statement.name.clone(),
561                    entity_name: entity_name.to_string(),
562                    target_store: existing_index.store().to_string(),
563                    field_path: ddl_field_path_report(field_paths.as_slice()),
564                }),
565            });
566        }
567
568        return Err(SqlDdlBindError::DuplicateIndexName {
569            index_name: statement.name.clone(),
570        });
571    }
572    reject_duplicate_expression_index_name(statement.name.as_str(), schema)?;
573    let predicate_sql =
574        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
575    reject_duplicate_field_path_index(field_paths.as_slice(), predicate_sql.as_deref(), schema)?;
576    let candidate_index = candidate_index_snapshot(
577        statement.name.as_str(),
578        field_paths.as_slice(),
579        predicate_sql.as_deref(),
580        statement.uniqueness,
581        schema,
582        index_store_path,
583    )?;
584
585    Ok(BoundSqlDdlRequest {
586        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
587            index_name: statement.name.clone(),
588            entity_name: entity_name.to_string(),
589            field_paths,
590            candidate_index,
591        }),
592    })
593}
594
595fn bind_drop_index_statement(
596    statement: &SqlDropIndexStatement,
597    accepted_before: &AcceptedSchemaSnapshot,
598    schema: &SchemaInfo,
599) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
600    let entity_name = schema
601        .entity_name()
602        .ok_or(SqlDdlBindError::MissingEntityName)?;
603
604    if let Some(sql_entity) = statement.entity.as_deref()
605        && !identifiers_tail_match(sql_entity, entity_name)
606    {
607        return Err(SqlDdlBindError::EntityMismatch {
608            sql_entity: sql_entity.to_string(),
609            expected_entity: entity_name.to_string(),
610        });
611    }
612    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
613        accepted_before,
614        &statement.name,
615    )
616    .map_err(|error| match error {
617        SchemaDdlIndexDropCandidateError::Generated => {
618            SqlDdlBindError::GeneratedIndexDropRejected {
619                index_name: statement.name.clone(),
620            }
621        }
622        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
623            entity_name: entity_name.to_string(),
624            index_name: statement.name.clone(),
625        },
626        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
627            index_name: statement.name.clone(),
628        },
629    });
630    let (dropped_index, field_path) = match drop_candidate {
631        Ok((dropped_index, field_path)) => (dropped_index, field_path),
632        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
633            return Ok(BoundSqlDdlRequest {
634                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
635                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
636                    index_name: statement.name.clone(),
637                    entity_name: entity_name.to_string(),
638                    target_store: "-".to_string(),
639                    field_path: Vec::new(),
640                }),
641            });
642        }
643        Err(error) => return Err(error),
644    };
645    Ok(BoundSqlDdlRequest {
646        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
647            index_name: statement.name.clone(),
648            entity_name: entity_name.to_string(),
649            dropped_index,
650            field_path,
651        }),
652    })
653}
654
655#[derive(Clone, Debug, Eq, PartialEq)]
656enum BoundSqlDdlCreateIndexKey {
657    FieldPath(BoundSqlDdlFieldPath),
658    Expression(String),
659}
660
661fn bind_create_index_key_item(
662    key_item: &SqlCreateIndexKeyItem,
663    entity_name: &str,
664    schema: &SchemaInfo,
665) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
666    match key_item {
667        SqlCreateIndexKeyItem::FieldPath(field_path) => {
668            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
669                .map(BoundSqlDdlCreateIndexKey::FieldPath)
670        }
671        SqlCreateIndexKeyItem::Expression(expression) => {
672            bind_create_index_expression_key(expression, entity_name, schema)
673        }
674    }
675}
676
677fn bind_create_index_expression_key(
678    expression: &SqlCreateIndexExpressionKey,
679    entity_name: &str,
680    schema: &SchemaInfo,
681) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
682    let _ = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
683
684    Ok(BoundSqlDdlCreateIndexKey::Expression(
685        expression.canonical_sql(),
686    ))
687}
688
689fn create_index_field_path_keys_only(
690    key_items: &[BoundSqlDdlCreateIndexKey],
691) -> Result<Vec<BoundSqlDdlFieldPath>, SqlDdlBindError> {
692    key_items
693        .iter()
694        .map(|key_item| match key_item {
695            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => Ok(field_path.clone()),
696            BoundSqlDdlCreateIndexKey::Expression(expression) => {
697                Err(SqlDdlBindError::UnsupportedExpressionIndexKey {
698                    expression: expression.clone(),
699                })
700            }
701        })
702        .collect()
703}
704
705fn bind_create_index_field_path(
706    field_path: &str,
707    entity_name: &str,
708    schema: &SchemaInfo,
709) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
710    let mut path = field_path
711        .split('.')
712        .map(str::trim)
713        .filter(|segment| !segment.is_empty());
714    let Some(root) = path.next() else {
715        return Err(SqlDdlBindError::UnknownFieldPath {
716            entity_name: entity_name.to_string(),
717            field_path: field_path.to_string(),
718        });
719    };
720    let segments = path.map(str::to_string).collect::<Vec<_>>();
721
722    let capabilities = if segments.is_empty() {
723        schema.sql_capabilities(root)
724    } else {
725        schema.nested_sql_capabilities(root, segments.as_slice())
726    }
727    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
728        entity_name: entity_name.to_string(),
729        field_path: field_path.to_string(),
730    })?;
731
732    if !capabilities.orderable() {
733        return Err(SqlDdlBindError::FieldPathNotIndexable {
734            field_path: field_path.to_string(),
735        });
736    }
737
738    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
739    accepted_path.push(root.to_string());
740    accepted_path.extend(segments.iter().cloned());
741
742    Ok(BoundSqlDdlFieldPath {
743        root: root.to_string(),
744        segments,
745        accepted_path,
746    })
747}
748
749fn find_field_path_index_by_name<'a>(
750    schema: &'a SchemaInfo,
751    index_name: &str,
752) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
753    schema
754        .field_path_indexes()
755        .iter()
756        .find(|index| index.name() == index_name)
757}
758
759fn existing_field_path_index_matches_request(
760    index: &crate::db::schema::SchemaIndexInfo,
761    field_paths: &[BoundSqlDdlFieldPath],
762    predicate_sql: Option<&str>,
763    uniqueness: SqlCreateIndexUniqueness,
764) -> bool {
765    let fields = index.fields();
766
767    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
768        && index.predicate_sql() == predicate_sql
769        && fields.len() == field_paths.len()
770        && fields
771            .iter()
772            .zip(field_paths)
773            .all(|(field, requested)| field.path() == requested.accepted_path())
774}
775
776fn reject_duplicate_expression_index_name(
777    index_name: &str,
778    schema: &SchemaInfo,
779) -> Result<(), SqlDdlBindError> {
780    if schema
781        .expression_indexes()
782        .iter()
783        .any(|index| index.name() == index_name)
784    {
785        return Err(SqlDdlBindError::DuplicateIndexName {
786            index_name: index_name.to_string(),
787        });
788    }
789
790    Ok(())
791}
792
793fn reject_duplicate_field_path_index(
794    field_paths: &[BoundSqlDdlFieldPath],
795    predicate_sql: Option<&str>,
796    schema: &SchemaInfo,
797) -> Result<(), SqlDdlBindError> {
798    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
799        let fields = index.fields();
800        index.predicate_sql() == predicate_sql
801            && fields.len() == field_paths.len()
802            && fields
803                .iter()
804                .zip(field_paths)
805                .all(|(field, requested)| field.path() == requested.accepted_path())
806    }) else {
807        return Ok(());
808    };
809
810    Err(SqlDdlBindError::DuplicateFieldPathIndex {
811        field_path: ddl_field_path_report(field_paths).join(","),
812        existing_index: existing_index.name().to_string(),
813    })
814}
815
816fn candidate_index_snapshot(
817    index_name: &str,
818    field_paths: &[BoundSqlDdlFieldPath],
819    predicate_sql: Option<&str>,
820    uniqueness: SqlCreateIndexUniqueness,
821    schema: &SchemaInfo,
822    index_store_path: &'static str,
823) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
824    let keys = field_paths
825        .iter()
826        .map(|field_path| {
827            schema
828                .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
829                .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
830                    field_path: field_path.accepted_path().join("."),
831                })
832        })
833        .collect::<Result<Vec<_>, _>>()?;
834
835    Ok(PersistedIndexSnapshot::new_sql_ddl(
836        schema.next_secondary_index_ordinal(),
837        index_name.to_string(),
838        index_store_path.to_string(),
839        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
840        PersistedIndexKeySnapshot::FieldPath(keys),
841        predicate_sql.map(str::to_string),
842    ))
843}
844
845fn validated_create_index_predicate_sql(
846    predicate_sql: Option<&str>,
847    schema: &SchemaInfo,
848) -> Result<Option<String>, SqlDdlBindError> {
849    let Some(predicate_sql) = predicate_sql else {
850        return Ok(None);
851    };
852    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
853        SqlDdlBindError::InvalidFilteredIndexPredicate {
854            detail: error.to_string(),
855        }
856    })?;
857    validate_predicate(schema, &predicate).map_err(|error| {
858        SqlDdlBindError::InvalidFilteredIndexPredicate {
859            detail: error.to_string(),
860        }
861    })?;
862
863    Ok(Some(predicate_sql.to_string()))
864}
865
866fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
867    match field_paths {
868        [field_path] => field_path.accepted_path().to_vec(),
869        _ => vec![
870            field_paths
871                .iter()
872                .map(|field_path| field_path.accepted_path().join("."))
873                .collect::<Vec<_>>()
874                .join(","),
875        ],
876    }
877}
878
879/// Lower one bound SQL DDL request through schema mutation admission.
880pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
881    request: &BoundSqlDdlRequest,
882) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
883    match request.statement() {
884        BoundSqlDdlStatement::CreateIndex(create) => {
885            admit_sql_ddl_field_path_index_candidate(create.candidate_index())
886        }
887        BoundSqlDdlStatement::DropIndex(drop) => {
888            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
889        }
890        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
891    }
892    .map_err(SqlDdlLoweringError::MutationAdmission)
893}
894
895/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
896pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
897    accepted_before: &AcceptedSchemaSnapshot,
898    request: &BoundSqlDdlRequest,
899) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
900    match request.statement() {
901        BoundSqlDdlStatement::CreateIndex(create) => {
902            derive_sql_ddl_field_path_index_accepted_after(
903                accepted_before,
904                create.candidate_index().clone(),
905            )
906        }
907        BoundSqlDdlStatement::DropIndex(drop) => {
908            derive_sql_ddl_secondary_index_drop_accepted_after(
909                accepted_before,
910                drop.dropped_index(),
911            )
912        }
913        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
914    }
915    .map_err(SqlDdlLoweringError::MutationAdmission)
916}
917
918fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
919    match bound.statement() {
920        BoundSqlDdlStatement::CreateIndex(create) => {
921            let target = create.candidate_index();
922
923            SqlDdlPreparationReport {
924                mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
925                target_index: target.name().to_string(),
926                target_store: target.store().to_string(),
927                field_path: ddl_field_path_report(create.field_paths()),
928                execution_status: SqlDdlExecutionStatus::PreparedOnly,
929                rows_scanned: 0,
930                index_keys_written: 0,
931            }
932        }
933        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
934            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
935            target_index: drop.index_name().to_string(),
936            target_store: drop.dropped_index().store().to_string(),
937            field_path: drop.field_path().to_vec(),
938            execution_status: SqlDdlExecutionStatus::PreparedOnly,
939            rows_scanned: 0,
940            index_keys_written: 0,
941        },
942        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
943            mutation_kind: no_op.mutation_kind(),
944            target_index: no_op.index_name().to_string(),
945            target_store: no_op.target_store().to_string(),
946            field_path: no_op.field_path().to_vec(),
947            execution_status: SqlDdlExecutionStatus::PreparedOnly,
948            rows_scanned: 0,
949            index_keys_written: 0,
950        },
951    }
952}