Skip to main content

icydb_core/db/sql/
ddl.rs

1//! Module: db::sql::ddl
2//! Responsibility: bind parsed SQL DDL to accepted schema catalog contracts.
3//! Does not own: mutation planning, physical index rebuilds, or SQL execution.
4//! Boundary: translates parser-owned DDL syntax into catalog-native requests.
5
6#![allow(
7    dead_code,
8    reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
9)]
10
11use crate::db::{
12    predicate::parse_sql_predicate,
13    query::predicate::validate_predicate,
14    schema::{
15        AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexExpressionOp,
16        PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
17        PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
18        SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
19        SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
20        SchemaExpressionIndexKeyItemInfo, SchemaInfo, admit_sql_ddl_expression_index_candidate,
21        admit_sql_ddl_field_path_index_candidate, admit_sql_ddl_secondary_index_drop_candidate,
22        derive_sql_ddl_expression_index_accepted_after,
23        derive_sql_ddl_field_path_index_accepted_after,
24        derive_sql_ddl_secondary_index_drop_accepted_after,
25        resolve_sql_ddl_secondary_index_drop_candidate,
26    },
27    sql::{
28        identifier::identifiers_tail_match,
29        parser::{
30            SqlAlterTableAddColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
31            SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
32            SqlDropIndexStatement, SqlStatement,
33        },
34    },
35};
36use thiserror::Error as ThisError;
37
38///
39/// PreparedSqlDdlCommand
40///
41/// Fully prepared SQL DDL command. This is intentionally not executable yet:
42/// it packages the accepted-catalog binding, accepted-after derivation, and
43/// schema mutation admission proof for the future execution boundary.
44///
45#[derive(Clone, Debug, Eq, PartialEq)]
46pub(in crate::db) struct PreparedSqlDdlCommand {
47    bound: BoundSqlDdlRequest,
48    derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
49    report: SqlDdlPreparationReport,
50}
51
52impl PreparedSqlDdlCommand {
53    /// Borrow the accepted-catalog-bound DDL request.
54    #[must_use]
55    pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
56        &self.bound
57    }
58
59    /// Borrow the accepted-after derivation proof.
60    #[must_use]
61    pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
62        self.derivation.as_ref()
63    }
64
65    /// Borrow the developer-facing preparation report.
66    #[must_use]
67    pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
68        &self.report
69    }
70
71    /// Return whether this prepared command needs schema or storage mutation.
72    #[must_use]
73    pub(in crate::db) const fn mutates_schema(&self) -> bool {
74        self.derivation.is_some()
75    }
76}
77
78///
79/// SqlDdlPreparationReport
80///
81/// Compact report for a DDL command that has passed all pre-execution
82/// frontend and schema-mutation checks.
83///
84#[derive(Clone, Debug, Eq, PartialEq)]
85pub struct SqlDdlPreparationReport {
86    mutation_kind: SqlDdlMutationKind,
87    target_index: String,
88    target_store: String,
89    field_path: Vec<String>,
90    execution_status: SqlDdlExecutionStatus,
91    rows_scanned: usize,
92    index_keys_written: usize,
93}
94
95impl SqlDdlPreparationReport {
96    /// Return the prepared DDL mutation kind.
97    #[must_use]
98    pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
99        self.mutation_kind
100    }
101
102    /// Borrow the target accepted index name.
103    #[must_use]
104    pub const fn target_index(&self) -> &str {
105        self.target_index.as_str()
106    }
107
108    /// Borrow the target accepted index store path.
109    #[must_use]
110    pub const fn target_store(&self) -> &str {
111        self.target_store.as_str()
112    }
113
114    /// Borrow the target field path.
115    #[must_use]
116    pub const fn field_path(&self) -> &[String] {
117        self.field_path.as_slice()
118    }
119
120    /// Return the execution status captured by this DDL report.
121    #[must_use]
122    pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
123        self.execution_status
124    }
125
126    /// Return rows scanned by DDL execution.
127    #[must_use]
128    pub const fn rows_scanned(&self) -> usize {
129        self.rows_scanned
130    }
131
132    /// Return index keys written by DDL execution.
133    #[must_use]
134    pub const fn index_keys_written(&self) -> usize {
135        self.index_keys_written
136    }
137
138    pub(in crate::db) const fn with_execution_status(
139        mut self,
140        execution_status: SqlDdlExecutionStatus,
141    ) -> Self {
142        self.execution_status = execution_status;
143        self
144    }
145
146    pub(in crate::db) const fn with_execution_metrics(
147        mut self,
148        rows_scanned: usize,
149        index_keys_written: usize,
150    ) -> Self {
151        self.rows_scanned = rows_scanned;
152        self.index_keys_written = index_keys_written;
153        self
154    }
155}
156
157///
158/// SqlDdlMutationKind
159///
160/// Developer-facing SQL DDL mutation kind.
161///
162#[derive(Clone, Copy, Debug, Eq, PartialEq)]
163pub enum SqlDdlMutationKind {
164    AddFieldPathIndex,
165    AddExpressionIndex,
166    DropSecondaryIndex,
167}
168
169impl SqlDdlMutationKind {
170    /// Return the stable diagnostic label for this DDL mutation kind.
171    #[must_use]
172    pub const fn as_str(self) -> &'static str {
173        match self {
174            Self::AddFieldPathIndex => "add_field_path_index",
175            Self::AddExpressionIndex => "add_expression_index",
176            Self::DropSecondaryIndex => "drop_secondary_index",
177        }
178    }
179}
180
181///
182/// SqlDdlExecutionStatus
183///
184/// SQL DDL execution state at the current boundary.
185///
186#[derive(Clone, Copy, Debug, Eq, PartialEq)]
187pub enum SqlDdlExecutionStatus {
188    PreparedOnly,
189    Published,
190    NoOp,
191}
192
193impl SqlDdlExecutionStatus {
194    /// Return the stable diagnostic label for this execution status.
195    #[must_use]
196    pub const fn as_str(self) -> &'static str {
197        match self {
198            Self::PreparedOnly => "prepared_only",
199            Self::Published => "published",
200            Self::NoOp => "no_op",
201        }
202    }
203}
204
205///
206/// BoundSqlDdlRequest
207///
208/// Accepted-catalog SQL DDL request after parser syntax has been resolved
209/// against one runtime schema snapshot.
210///
211#[derive(Clone, Debug, Eq, PartialEq)]
212pub(in crate::db) struct BoundSqlDdlRequest {
213    statement: BoundSqlDdlStatement,
214}
215
216impl BoundSqlDdlRequest {
217    /// Borrow the bound statement payload.
218    #[must_use]
219    pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
220        &self.statement
221    }
222}
223
224///
225/// BoundSqlDdlStatement
226///
227/// Catalog-resolved DDL statement vocabulary.
228///
229#[derive(Clone, Debug, Eq, PartialEq)]
230pub(in crate::db) enum BoundSqlDdlStatement {
231    CreateIndex(BoundSqlCreateIndexRequest),
232    DropIndex(BoundSqlDropIndexRequest),
233    NoOp(BoundSqlDdlNoOpRequest),
234}
235
236///
237/// BoundSqlDdlNoOpRequest
238///
239/// Catalog-resolved idempotent DDL request that is already satisfied or absent.
240///
241#[derive(Clone, Debug, Eq, PartialEq)]
242pub(in crate::db) struct BoundSqlDdlNoOpRequest {
243    mutation_kind: SqlDdlMutationKind,
244    index_name: String,
245    entity_name: String,
246    target_store: String,
247    field_path: Vec<String>,
248}
249
250impl BoundSqlDdlNoOpRequest {
251    /// Return the user-facing mutation family this no-op belongs to.
252    #[must_use]
253    pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
254        self.mutation_kind
255    }
256
257    /// Borrow the requested index name.
258    #[must_use]
259    pub(in crate::db) const fn index_name(&self) -> &str {
260        self.index_name.as_str()
261    }
262
263    /// Borrow the accepted entity name that owns this request.
264    #[must_use]
265    pub(in crate::db) const fn entity_name(&self) -> &str {
266        self.entity_name.as_str()
267    }
268
269    /// Borrow the accepted index store path, or `-` when no target exists.
270    #[must_use]
271    pub(in crate::db) const fn target_store(&self) -> &str {
272        self.target_store.as_str()
273    }
274
275    /// Borrow the target field path, empty when no target exists.
276    #[must_use]
277    pub(in crate::db) const fn field_path(&self) -> &[String] {
278        self.field_path.as_slice()
279    }
280}
281
282///
283/// BoundSqlCreateIndexRequest
284///
285/// Catalog-resolved request for adding one secondary index.
286///
287#[derive(Clone, Debug, Eq, PartialEq)]
288pub(in crate::db) struct BoundSqlCreateIndexRequest {
289    index_name: String,
290    entity_name: String,
291    key_items: Vec<BoundSqlDdlCreateIndexKey>,
292    field_paths: Vec<BoundSqlDdlFieldPath>,
293    candidate_index: PersistedIndexSnapshot,
294}
295
296impl BoundSqlCreateIndexRequest {
297    /// Borrow the requested index name.
298    #[must_use]
299    pub(in crate::db) const fn index_name(&self) -> &str {
300        self.index_name.as_str()
301    }
302
303    /// Borrow the accepted entity name that owns this request.
304    #[must_use]
305    pub(in crate::db) const fn entity_name(&self) -> &str {
306        self.entity_name.as_str()
307    }
308
309    /// Borrow the accepted field-path targets.
310    #[must_use]
311    pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
312        self.field_paths.as_slice()
313    }
314
315    /// Borrow the accepted key targets in DDL key order.
316    #[must_use]
317    pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
318        self.key_items.as_slice()
319    }
320
321    /// Borrow the candidate accepted index snapshot for mutation admission.
322    #[must_use]
323    pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
324        &self.candidate_index
325    }
326}
327
328///
329/// BoundSqlDropIndexRequest
330///
331/// Catalog-resolved request for dropping one DDL-published secondary index.
332///
333#[derive(Clone, Debug, Eq, PartialEq)]
334pub(in crate::db) struct BoundSqlDropIndexRequest {
335    index_name: String,
336    entity_name: String,
337    dropped_index: PersistedIndexSnapshot,
338    field_path: Vec<String>,
339}
340
341impl BoundSqlDropIndexRequest {
342    /// Borrow the requested index name.
343    #[must_use]
344    pub(in crate::db) const fn index_name(&self) -> &str {
345        self.index_name.as_str()
346    }
347
348    /// Borrow the accepted entity name that owns this request.
349    #[must_use]
350    pub(in crate::db) const fn entity_name(&self) -> &str {
351        self.entity_name.as_str()
352    }
353
354    /// Borrow the accepted index snapshot that will be removed.
355    #[must_use]
356    pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
357        &self.dropped_index
358    }
359
360    /// Borrow the dropped field-path target.
361    #[must_use]
362    pub(in crate::db) const fn field_path(&self) -> &[String] {
363        self.field_path.as_slice()
364    }
365}
366
367///
368/// BoundSqlDdlFieldPath
369///
370/// Accepted field-path target for SQL DDL binding.
371///
372#[derive(Clone, Debug, Eq, PartialEq)]
373pub(in crate::db) struct BoundSqlDdlFieldPath {
374    root: String,
375    segments: Vec<String>,
376    accepted_path: Vec<String>,
377}
378
379impl BoundSqlDdlFieldPath {
380    /// Borrow the top-level field name.
381    #[must_use]
382    pub(in crate::db) const fn root(&self) -> &str {
383        self.root.as_str()
384    }
385
386    /// Borrow nested path segments below the top-level field.
387    #[must_use]
388    pub(in crate::db) const fn segments(&self) -> &[String] {
389        self.segments.as_slice()
390    }
391
392    /// Borrow the full accepted field path used by index metadata.
393    #[must_use]
394    pub(in crate::db) const fn accepted_path(&self) -> &[String] {
395        self.accepted_path.as_slice()
396    }
397}
398
399///
400/// SqlDdlBindError
401///
402/// Typed fail-closed reasons for SQL DDL catalog binding.
403///
404#[derive(Debug, Eq, PartialEq, ThisError)]
405pub(in crate::db) enum SqlDdlBindError {
406    #[error("SQL DDL binder requires a DDL statement")]
407    NotDdl,
408
409    #[error("accepted schema does not expose an entity name")]
410    MissingEntityName,
411
412    #[error("accepted schema does not expose an entity path")]
413    MissingEntityPath,
414
415    #[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
416    EntityMismatch {
417        sql_entity: String,
418        expected_entity: String,
419    },
420
421    #[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
422    UnknownFieldPath {
423        entity_name: String,
424        field_path: String,
425    },
426
427    #[error("field path '{field_path}' is not indexable")]
428    FieldPathNotIndexable { field_path: String },
429
430    #[error("field path '{field_path}' depends on generated-only metadata")]
431    FieldPathNotAcceptedCatalogBacked { field_path: String },
432
433    #[error("invalid filtered index predicate: {detail}")]
434    InvalidFilteredIndexPredicate { detail: String },
435
436    #[error("index name '{index_name}' already exists in the accepted schema")]
437    DuplicateIndexName { index_name: String },
438
439    #[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
440    DuplicateFieldPathIndex {
441        field_path: String,
442        existing_index: String,
443    },
444
445    #[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
446    UnknownIndex {
447        entity_name: String,
448        index_name: String,
449    },
450
451    #[error(
452        "index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
453    )]
454    GeneratedIndexDropRejected { index_name: String },
455
456    #[error(
457        "index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
458    )]
459    UnsupportedDropIndex { index_name: String },
460
461    #[error(
462        "SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
463    )]
464    UnsupportedAlterTableAddColumn {
465        entity_name: String,
466        column_name: String,
467    },
468}
469
470///
471/// SqlDdlLoweringError
472///
473/// Typed fail-closed reasons while lowering bound DDL into schema mutation
474/// admission.
475///
476#[derive(Debug, Eq, PartialEq, ThisError)]
477pub(in crate::db) enum SqlDdlLoweringError {
478    #[error("SQL DDL lowering requires a supported DDL statement")]
479    UnsupportedStatement,
480
481    #[error("schema mutation admission rejected DDL candidate: {0:?}")]
482    MutationAdmission(SchemaDdlMutationAdmissionError),
483}
484
485///
486/// SqlDdlPrepareError
487///
488/// Typed fail-closed preparation errors for SQL DDL.
489///
490#[derive(Debug, Eq, PartialEq, ThisError)]
491pub(in crate::db) enum SqlDdlPrepareError {
492    #[error("{0}")]
493    Bind(#[from] SqlDdlBindError),
494
495    #[error("{0}")]
496    Lowering(#[from] SqlDdlLoweringError),
497}
498
499/// Prepare one parsed SQL DDL statement through every pre-execution proof.
500pub(in crate::db) fn prepare_sql_ddl_statement(
501    statement: &SqlStatement,
502    accepted_before: &AcceptedSchemaSnapshot,
503    schema: &SchemaInfo,
504    index_store_path: &'static str,
505) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
506    let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
507    let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
508        None
509    } else {
510        Some(derive_bound_sql_ddl_accepted_after(
511            accepted_before,
512            &bound,
513        )?)
514    };
515    let report = ddl_preparation_report(&bound);
516
517    Ok(PreparedSqlDdlCommand {
518        bound,
519        derivation,
520        report,
521    })
522}
523
524/// Bind one parsed SQL DDL statement against accepted catalog metadata.
525pub(in crate::db) fn bind_sql_ddl_statement(
526    statement: &SqlStatement,
527    accepted_before: &AcceptedSchemaSnapshot,
528    schema: &SchemaInfo,
529    index_store_path: &'static str,
530) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
531    let SqlStatement::Ddl(ddl) = statement else {
532        return Err(SqlDdlBindError::NotDdl);
533    };
534
535    match ddl {
536        SqlDdlStatement::CreateIndex(statement) => {
537            bind_create_index_statement(statement, schema, index_store_path)
538        }
539        SqlDdlStatement::DropIndex(statement) => {
540            bind_drop_index_statement(statement, accepted_before, schema)
541        }
542        SqlDdlStatement::AlterTableAddColumn(statement) => {
543            bind_alter_table_add_column_statement(statement, schema)
544        }
545    }
546}
547
548fn bind_create_index_statement(
549    statement: &SqlCreateIndexStatement,
550    schema: &SchemaInfo,
551    index_store_path: &'static str,
552) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
553    let entity_name = schema
554        .entity_name()
555        .ok_or(SqlDdlBindError::MissingEntityName)?;
556
557    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
558        return Err(SqlDdlBindError::EntityMismatch {
559            sql_entity: statement.entity.clone(),
560            expected_entity: entity_name.to_string(),
561        });
562    }
563
564    let key_items = statement
565        .key_items
566        .iter()
567        .map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
568        .collect::<Result<Vec<_>, _>>()?;
569    let field_paths = create_index_field_path_report_items(key_items.as_slice());
570    if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
571        if key_items_are_field_path_only(key_items.as_slice())
572            && statement.if_not_exists
573            && existing_field_path_index_matches_request(
574                existing_index,
575                field_paths.as_slice(),
576                statement.predicate_sql.as_deref(),
577                statement.uniqueness,
578            )
579        {
580            return Ok(BoundSqlDdlRequest {
581                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
582                    mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
583                    index_name: statement.name.clone(),
584                    entity_name: entity_name.to_string(),
585                    target_store: existing_index.store().to_string(),
586                    field_path: ddl_field_path_report(field_paths.as_slice()),
587                }),
588            });
589        }
590
591        return Err(SqlDdlBindError::DuplicateIndexName {
592            index_name: statement.name.clone(),
593        });
594    }
595    let predicate_sql =
596        validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
597    if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
598        if statement.if_not_exists
599            && existing_expression_index_matches_request(
600                existing_index,
601                key_items.as_slice(),
602                predicate_sql.as_deref(),
603                statement.uniqueness,
604            )
605        {
606            return Ok(BoundSqlDdlRequest {
607                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
608                    mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
609                    index_name: statement.name.clone(),
610                    entity_name: entity_name.to_string(),
611                    target_store: existing_index.store().to_string(),
612                    field_path: ddl_key_item_report(key_items.as_slice()),
613                }),
614            });
615        }
616
617        return Err(SqlDdlBindError::DuplicateIndexName {
618            index_name: statement.name.clone(),
619        });
620    }
621    if key_items_are_field_path_only(key_items.as_slice()) {
622        reject_duplicate_field_path_index(
623            field_paths.as_slice(),
624            predicate_sql.as_deref(),
625            schema,
626        )?;
627    } else {
628        reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
629    }
630    let candidate_index = candidate_index_snapshot(
631        statement.name.as_str(),
632        key_items.as_slice(),
633        predicate_sql.as_deref(),
634        statement.uniqueness,
635        schema,
636        index_store_path,
637    )?;
638
639    Ok(BoundSqlDdlRequest {
640        statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
641            index_name: statement.name.clone(),
642            entity_name: entity_name.to_string(),
643            key_items,
644            field_paths,
645            candidate_index,
646        }),
647    })
648}
649
650fn bind_drop_index_statement(
651    statement: &SqlDropIndexStatement,
652    accepted_before: &AcceptedSchemaSnapshot,
653    schema: &SchemaInfo,
654) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
655    let entity_name = schema
656        .entity_name()
657        .ok_or(SqlDdlBindError::MissingEntityName)?;
658
659    if let Some(sql_entity) = statement.entity.as_deref()
660        && !identifiers_tail_match(sql_entity, entity_name)
661    {
662        return Err(SqlDdlBindError::EntityMismatch {
663            sql_entity: sql_entity.to_string(),
664            expected_entity: entity_name.to_string(),
665        });
666    }
667    let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
668        accepted_before,
669        &statement.name,
670    )
671    .map_err(|error| match error {
672        SchemaDdlIndexDropCandidateError::Generated => {
673            SqlDdlBindError::GeneratedIndexDropRejected {
674                index_name: statement.name.clone(),
675            }
676        }
677        SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
678            entity_name: entity_name.to_string(),
679            index_name: statement.name.clone(),
680        },
681        SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
682            index_name: statement.name.clone(),
683        },
684    });
685    let (dropped_index, field_path) = match drop_candidate {
686        Ok((dropped_index, field_path)) => (dropped_index, field_path),
687        Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
688            return Ok(BoundSqlDdlRequest {
689                statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
690                    mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
691                    index_name: statement.name.clone(),
692                    entity_name: entity_name.to_string(),
693                    target_store: "-".to_string(),
694                    field_path: Vec::new(),
695                }),
696            });
697        }
698        Err(error) => return Err(error),
699    };
700    Ok(BoundSqlDdlRequest {
701        statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
702            index_name: statement.name.clone(),
703            entity_name: entity_name.to_string(),
704            dropped_index,
705            field_path,
706        }),
707    })
708}
709
710fn bind_alter_table_add_column_statement(
711    statement: &SqlAlterTableAddColumnStatement,
712    schema: &SchemaInfo,
713) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
714    let entity_name = schema
715        .entity_name()
716        .ok_or(SqlDdlBindError::MissingEntityName)?;
717
718    if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
719        return Err(SqlDdlBindError::EntityMismatch {
720            sql_entity: statement.entity.clone(),
721            expected_entity: entity_name.to_string(),
722        });
723    }
724
725    Err(SqlDdlBindError::UnsupportedAlterTableAddColumn {
726        entity_name: entity_name.to_string(),
727        column_name: statement.column_name.clone(),
728    })
729}
730
731#[derive(Clone, Debug, Eq, PartialEq)]
732pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
733    FieldPath(BoundSqlDdlFieldPath),
734    Expression(BoundSqlDdlExpressionKey),
735}
736
737///
738/// BoundSqlDdlExpressionKey
739///
740/// Accepted expression-index key target for SQL DDL binding.
741///
742#[derive(Clone, Debug, Eq, PartialEq)]
743pub(in crate::db) struct BoundSqlDdlExpressionKey {
744    op: PersistedIndexExpressionOp,
745    source: BoundSqlDdlFieldPath,
746    canonical_sql: String,
747}
748
749impl BoundSqlDdlExpressionKey {
750    /// Return the accepted expression operation.
751    #[must_use]
752    pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
753        self.op
754    }
755
756    /// Borrow the accepted source field path.
757    #[must_use]
758    pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
759        &self.source
760    }
761
762    /// Borrow the SQL-facing canonical expression text.
763    #[must_use]
764    pub(in crate::db) const fn canonical_sql(&self) -> &str {
765        self.canonical_sql.as_str()
766    }
767}
768
769fn bind_create_index_key_item(
770    key_item: &SqlCreateIndexKeyItem,
771    entity_name: &str,
772    schema: &SchemaInfo,
773) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
774    match key_item {
775        SqlCreateIndexKeyItem::FieldPath(field_path) => {
776            bind_create_index_field_path(field_path.as_str(), entity_name, schema)
777                .map(BoundSqlDdlCreateIndexKey::FieldPath)
778        }
779        SqlCreateIndexKeyItem::Expression(expression) => {
780            bind_create_index_expression_key(expression, entity_name, schema)
781        }
782    }
783}
784
785fn bind_create_index_expression_key(
786    expression: &SqlCreateIndexExpressionKey,
787    entity_name: &str,
788    schema: &SchemaInfo,
789) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
790    let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
791
792    Ok(BoundSqlDdlCreateIndexKey::Expression(
793        BoundSqlDdlExpressionKey {
794            op: expression_op_from_sql_function(expression.function),
795            source,
796            canonical_sql: expression.canonical_sql(),
797        },
798    ))
799}
800
801const fn expression_op_from_sql_function(
802    function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
803) -> PersistedIndexExpressionOp {
804    match function {
805        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
806            PersistedIndexExpressionOp::Lower
807        }
808        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
809            PersistedIndexExpressionOp::Upper
810        }
811        crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
812            PersistedIndexExpressionOp::Trim
813        }
814    }
815}
816
817fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
818    key_items
819        .iter()
820        .all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
821}
822
823fn create_index_field_path_report_items(
824    key_items: &[BoundSqlDdlCreateIndexKey],
825) -> Vec<BoundSqlDdlFieldPath> {
826    key_items
827        .iter()
828        .map(|key_item| match key_item {
829            BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
830            BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
831        })
832        .collect()
833}
834
835fn bind_create_index_field_path(
836    field_path: &str,
837    entity_name: &str,
838    schema: &SchemaInfo,
839) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
840    let mut path = field_path
841        .split('.')
842        .map(str::trim)
843        .filter(|segment| !segment.is_empty());
844    let Some(root) = path.next() else {
845        return Err(SqlDdlBindError::UnknownFieldPath {
846            entity_name: entity_name.to_string(),
847            field_path: field_path.to_string(),
848        });
849    };
850    let segments = path.map(str::to_string).collect::<Vec<_>>();
851
852    let capabilities = if segments.is_empty() {
853        schema.sql_capabilities(root)
854    } else {
855        schema.nested_sql_capabilities(root, segments.as_slice())
856    }
857    .ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
858        entity_name: entity_name.to_string(),
859        field_path: field_path.to_string(),
860    })?;
861
862    if !capabilities.orderable() {
863        return Err(SqlDdlBindError::FieldPathNotIndexable {
864            field_path: field_path.to_string(),
865        });
866    }
867
868    let mut accepted_path = Vec::with_capacity(segments.len() + 1);
869    accepted_path.push(root.to_string());
870    accepted_path.extend(segments.iter().cloned());
871
872    Ok(BoundSqlDdlFieldPath {
873        root: root.to_string(),
874        segments,
875        accepted_path,
876    })
877}
878
879fn find_field_path_index_by_name<'a>(
880    schema: &'a SchemaInfo,
881    index_name: &str,
882) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
883    schema
884        .field_path_indexes()
885        .iter()
886        .find(|index| index.name() == index_name)
887}
888
889fn existing_field_path_index_matches_request(
890    index: &crate::db::schema::SchemaIndexInfo,
891    field_paths: &[BoundSqlDdlFieldPath],
892    predicate_sql: Option<&str>,
893    uniqueness: SqlCreateIndexUniqueness,
894) -> bool {
895    let fields = index.fields();
896
897    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
898        && index.predicate_sql() == predicate_sql
899        && fields.len() == field_paths.len()
900        && fields
901            .iter()
902            .zip(field_paths)
903            .all(|(field, requested)| field.path() == requested.accepted_path())
904}
905
906fn find_expression_index_by_name<'a>(
907    schema: &'a SchemaInfo,
908    index_name: &str,
909) -> Option<&'a SchemaExpressionIndexInfo> {
910    schema
911        .expression_indexes()
912        .iter()
913        .find(|index| index.name() == index_name)
914}
915
916fn existing_expression_index_matches_request(
917    index: &SchemaExpressionIndexInfo,
918    key_items: &[BoundSqlDdlCreateIndexKey],
919    predicate_sql: Option<&str>,
920    uniqueness: SqlCreateIndexUniqueness,
921) -> bool {
922    let existing_key_items = index.key_items();
923
924    index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
925        && index.predicate_sql() == predicate_sql
926        && existing_key_items.len() == key_items.len()
927        && existing_key_items
928            .iter()
929            .zip(key_items)
930            .all(existing_expression_key_item_matches_request)
931}
932
933fn existing_expression_key_item_matches_request(
934    existing: (
935        &SchemaExpressionIndexKeyItemInfo,
936        &BoundSqlDdlCreateIndexKey,
937    ),
938) -> bool {
939    let (existing, requested) = existing;
940    match (existing, requested) {
941        (
942            SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
943            BoundSqlDdlCreateIndexKey::FieldPath(requested),
944        ) => existing.path() == requested.accepted_path(),
945        (
946            SchemaExpressionIndexKeyItemInfo::Expression(existing),
947            BoundSqlDdlCreateIndexKey::Expression(requested),
948        ) => existing_expression_component_matches_request(
949            existing.op(),
950            existing.source().path(),
951            existing.canonical_text(),
952            requested,
953        ),
954        _ => false,
955    }
956}
957
958fn existing_expression_component_matches_request(
959    existing_op: PersistedIndexExpressionOp,
960    existing_path: &[String],
961    existing_canonical_text: &str,
962    requested: &BoundSqlDdlExpressionKey,
963) -> bool {
964    let requested_path = requested.source().accepted_path();
965    let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
966
967    existing_op == requested.op()
968        && existing_path == requested_path
969        && existing_canonical_text == requested_canonical_text
970}
971
972fn reject_duplicate_expression_index(
973    key_items: &[BoundSqlDdlCreateIndexKey],
974    predicate_sql: Option<&str>,
975    schema: &SchemaInfo,
976) -> Result<(), SqlDdlBindError> {
977    let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
978        existing_expression_index_matches_request(
979            index,
980            key_items,
981            predicate_sql,
982            if index.unique() {
983                SqlCreateIndexUniqueness::Unique
984            } else {
985                SqlCreateIndexUniqueness::NonUnique
986            },
987        )
988    }) else {
989        return Ok(());
990    };
991
992    Err(SqlDdlBindError::DuplicateFieldPathIndex {
993        field_path: ddl_key_item_report(key_items).join(","),
994        existing_index: existing_index.name().to_string(),
995    })
996}
997
998fn reject_duplicate_field_path_index(
999    field_paths: &[BoundSqlDdlFieldPath],
1000    predicate_sql: Option<&str>,
1001    schema: &SchemaInfo,
1002) -> Result<(), SqlDdlBindError> {
1003    let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
1004        let fields = index.fields();
1005        index.predicate_sql() == predicate_sql
1006            && fields.len() == field_paths.len()
1007            && fields
1008                .iter()
1009                .zip(field_paths)
1010                .all(|(field, requested)| field.path() == requested.accepted_path())
1011    }) else {
1012        return Ok(());
1013    };
1014
1015    Err(SqlDdlBindError::DuplicateFieldPathIndex {
1016        field_path: ddl_field_path_report(field_paths).join(","),
1017        existing_index: existing_index.name().to_string(),
1018    })
1019}
1020
1021fn candidate_index_snapshot(
1022    index_name: &str,
1023    key_items: &[BoundSqlDdlCreateIndexKey],
1024    predicate_sql: Option<&str>,
1025    uniqueness: SqlCreateIndexUniqueness,
1026    schema: &SchemaInfo,
1027    index_store_path: &'static str,
1028) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
1029    let key = if key_items_are_field_path_only(key_items) {
1030        PersistedIndexKeySnapshot::FieldPath(
1031            key_items
1032                .iter()
1033                .map(|key_item| {
1034                    let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
1035                        unreachable!("field-path-only index checked before field-path lowering");
1036                    };
1037
1038                    accepted_index_field_path_snapshot(schema, field_path)
1039                })
1040                .collect::<Result<Vec<_>, _>>()?,
1041        )
1042    } else {
1043        PersistedIndexKeySnapshot::Items(
1044            key_items
1045                .iter()
1046                .map(|key_item| match key_item {
1047                    BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
1048                        accepted_index_field_path_snapshot(schema, field_path)
1049                            .map(PersistedIndexKeyItemSnapshot::FieldPath)
1050                    }
1051                    BoundSqlDdlCreateIndexKey::Expression(expression) => {
1052                        accepted_index_expression_snapshot(schema, expression)
1053                    }
1054                })
1055                .collect::<Result<Vec<_>, _>>()?,
1056        )
1057    };
1058
1059    Ok(PersistedIndexSnapshot::new_sql_ddl(
1060        schema.next_secondary_index_ordinal(),
1061        index_name.to_string(),
1062        index_store_path.to_string(),
1063        matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
1064        key,
1065        predicate_sql.map(str::to_string),
1066    ))
1067}
1068
1069fn accepted_index_field_path_snapshot(
1070    schema: &SchemaInfo,
1071    field_path: &BoundSqlDdlFieldPath,
1072) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
1073    schema
1074        .accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
1075        .ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
1076            field_path: field_path.accepted_path().join("."),
1077        })
1078}
1079
1080fn accepted_index_expression_snapshot(
1081    schema: &SchemaInfo,
1082    expression: &BoundSqlDdlExpressionKey,
1083) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
1084    let source = accepted_index_field_path_snapshot(schema, expression.source())?;
1085    let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
1086        return Err(SqlDdlBindError::FieldPathNotIndexable {
1087            field_path: expression.source().accepted_path().join("."),
1088        });
1089    };
1090
1091    Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
1092        PersistedIndexExpressionSnapshot::new(
1093            expression.op(),
1094            source.clone(),
1095            source.kind().clone(),
1096            output_kind,
1097            format!("expr:v1:{}", expression.canonical_sql()),
1098        ),
1099    )))
1100}
1101
1102fn expression_output_kind(
1103    op: PersistedIndexExpressionOp,
1104    source_kind: &PersistedFieldKind,
1105) -> Option<PersistedFieldKind> {
1106    match op {
1107        PersistedIndexExpressionOp::Lower
1108        | PersistedIndexExpressionOp::Upper
1109        | PersistedIndexExpressionOp::Trim
1110        | PersistedIndexExpressionOp::LowerTrim => {
1111            if matches!(source_kind, PersistedFieldKind::Text { .. }) {
1112                Some(source_kind.clone())
1113            } else {
1114                None
1115            }
1116        }
1117        PersistedIndexExpressionOp::Date => {
1118            if matches!(
1119                source_kind,
1120                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1121            ) {
1122                Some(PersistedFieldKind::Date)
1123            } else {
1124                None
1125            }
1126        }
1127        PersistedIndexExpressionOp::Year
1128        | PersistedIndexExpressionOp::Month
1129        | PersistedIndexExpressionOp::Day => {
1130            if matches!(
1131                source_kind,
1132                PersistedFieldKind::Date | PersistedFieldKind::Timestamp
1133            ) {
1134                Some(PersistedFieldKind::Int)
1135            } else {
1136                None
1137            }
1138        }
1139    }
1140}
1141
1142fn validated_create_index_predicate_sql(
1143    predicate_sql: Option<&str>,
1144    schema: &SchemaInfo,
1145) -> Result<Option<String>, SqlDdlBindError> {
1146    let Some(predicate_sql) = predicate_sql else {
1147        return Ok(None);
1148    };
1149    let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
1150        SqlDdlBindError::InvalidFilteredIndexPredicate {
1151            detail: error.to_string(),
1152        }
1153    })?;
1154    validate_predicate(schema, &predicate).map_err(|error| {
1155        SqlDdlBindError::InvalidFilteredIndexPredicate {
1156            detail: error.to_string(),
1157        }
1158    })?;
1159
1160    Ok(Some(predicate_sql.to_string()))
1161}
1162
1163fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
1164    match field_paths {
1165        [field_path] => field_path.accepted_path().to_vec(),
1166        _ => vec![
1167            field_paths
1168                .iter()
1169                .map(|field_path| field_path.accepted_path().join("."))
1170                .collect::<Vec<_>>()
1171                .join(","),
1172        ],
1173    }
1174}
1175
1176fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
1177    match key_items {
1178        [key_item] => vec![ddl_key_item_text(key_item)],
1179        _ => vec![
1180            key_items
1181                .iter()
1182                .map(ddl_key_item_text)
1183                .collect::<Vec<_>>()
1184                .join(","),
1185        ],
1186    }
1187}
1188
1189fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
1190    match key_item {
1191        BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
1192        BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
1193    }
1194}
1195
1196/// Lower one bound SQL DDL request through schema mutation admission.
1197pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
1198    request: &BoundSqlDdlRequest,
1199) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
1200    match request.statement() {
1201        BoundSqlDdlStatement::CreateIndex(create) => {
1202            if create.candidate_index().key().is_field_path_only() {
1203                admit_sql_ddl_field_path_index_candidate(create.candidate_index())
1204            } else {
1205                admit_sql_ddl_expression_index_candidate(create.candidate_index())
1206            }
1207        }
1208        BoundSqlDdlStatement::DropIndex(drop) => {
1209            admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
1210        }
1211        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1212    }
1213    .map_err(SqlDdlLoweringError::MutationAdmission)
1214}
1215
1216/// Derive the accepted-after schema snapshot for one bound SQL DDL request.
1217pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
1218    accepted_before: &AcceptedSchemaSnapshot,
1219    request: &BoundSqlDdlRequest,
1220) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
1221    match request.statement() {
1222        BoundSqlDdlStatement::CreateIndex(create) => {
1223            if create.candidate_index().key().is_field_path_only() {
1224                derive_sql_ddl_field_path_index_accepted_after(
1225                    accepted_before,
1226                    create.candidate_index().clone(),
1227                )
1228            } else {
1229                derive_sql_ddl_expression_index_accepted_after(
1230                    accepted_before,
1231                    create.candidate_index().clone(),
1232                )
1233            }
1234        }
1235        BoundSqlDdlStatement::DropIndex(drop) => {
1236            derive_sql_ddl_secondary_index_drop_accepted_after(
1237                accepted_before,
1238                drop.dropped_index(),
1239            )
1240        }
1241        BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
1242    }
1243    .map_err(SqlDdlLoweringError::MutationAdmission)
1244}
1245
1246fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
1247    match bound.statement() {
1248        BoundSqlDdlStatement::CreateIndex(create) => {
1249            let target = create.candidate_index();
1250
1251            SqlDdlPreparationReport {
1252                mutation_kind: if target.key().is_field_path_only() {
1253                    SqlDdlMutationKind::AddFieldPathIndex
1254                } else {
1255                    SqlDdlMutationKind::AddExpressionIndex
1256                },
1257                target_index: target.name().to_string(),
1258                target_store: target.store().to_string(),
1259                field_path: ddl_key_item_report(create.key_items()),
1260                execution_status: SqlDdlExecutionStatus::PreparedOnly,
1261                rows_scanned: 0,
1262                index_keys_written: 0,
1263            }
1264        }
1265        BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
1266            mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
1267            target_index: drop.index_name().to_string(),
1268            target_store: drop.dropped_index().store().to_string(),
1269            field_path: drop.field_path().to_vec(),
1270            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1271            rows_scanned: 0,
1272            index_keys_written: 0,
1273        },
1274        BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
1275            mutation_kind: no_op.mutation_kind(),
1276            target_index: no_op.index_name().to_string(),
1277            target_store: no_op.target_store().to_string(),
1278            field_path: no_op.field_path().to_vec(),
1279            execution_status: SqlDdlExecutionStatus::PreparedOnly,
1280            rows_scanned: 0,
1281            index_keys_written: 0,
1282        },
1283    }
1284}