#![allow(
dead_code,
reason = "DDL binding exposes prepare-only diagnostics and test-only inspection accessors"
)]
use crate::db::{
predicate::parse_sql_predicate,
query::predicate::validate_predicate,
schema::{
AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexExpressionOp,
PersistedIndexExpressionSnapshot, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
PersistedIndexSnapshot, SchemaDdlAcceptedSnapshotDerivation,
SchemaDdlIndexDropCandidateError, SchemaDdlMutationAdmission,
SchemaDdlMutationAdmissionError, SchemaExpressionIndexInfo,
SchemaExpressionIndexKeyItemInfo, SchemaInfo, admit_sql_ddl_expression_index_candidate,
admit_sql_ddl_field_path_index_candidate, admit_sql_ddl_secondary_index_drop_candidate,
derive_sql_ddl_expression_index_accepted_after,
derive_sql_ddl_field_path_index_accepted_after,
derive_sql_ddl_secondary_index_drop_accepted_after,
resolve_sql_ddl_secondary_index_drop_candidate,
},
sql::{
identifier::identifiers_tail_match,
parser::{
SqlAlterTableAddColumnStatement, SqlCreateIndexExpressionKey, SqlCreateIndexKeyItem,
SqlCreateIndexStatement, SqlCreateIndexUniqueness, SqlDdlStatement,
SqlDropIndexStatement, SqlStatement,
},
},
};
use thiserror::Error as ThisError;
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct PreparedSqlDdlCommand {
bound: BoundSqlDdlRequest,
derivation: Option<SchemaDdlAcceptedSnapshotDerivation>,
report: SqlDdlPreparationReport,
}
impl PreparedSqlDdlCommand {
#[must_use]
pub(in crate::db) const fn bound(&self) -> &BoundSqlDdlRequest {
&self.bound
}
#[must_use]
pub(in crate::db) const fn derivation(&self) -> Option<&SchemaDdlAcceptedSnapshotDerivation> {
self.derivation.as_ref()
}
#[must_use]
pub(in crate::db) const fn report(&self) -> &SqlDdlPreparationReport {
&self.report
}
#[must_use]
pub(in crate::db) const fn mutates_schema(&self) -> bool {
self.derivation.is_some()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SqlDdlPreparationReport {
mutation_kind: SqlDdlMutationKind,
target_index: String,
target_store: String,
field_path: Vec<String>,
execution_status: SqlDdlExecutionStatus,
rows_scanned: usize,
index_keys_written: usize,
}
impl SqlDdlPreparationReport {
#[must_use]
pub const fn mutation_kind(&self) -> SqlDdlMutationKind {
self.mutation_kind
}
#[must_use]
pub const fn target_index(&self) -> &str {
self.target_index.as_str()
}
#[must_use]
pub const fn target_store(&self) -> &str {
self.target_store.as_str()
}
#[must_use]
pub const fn field_path(&self) -> &[String] {
self.field_path.as_slice()
}
#[must_use]
pub const fn execution_status(&self) -> SqlDdlExecutionStatus {
self.execution_status
}
#[must_use]
pub const fn rows_scanned(&self) -> usize {
self.rows_scanned
}
#[must_use]
pub const fn index_keys_written(&self) -> usize {
self.index_keys_written
}
pub(in crate::db) const fn with_execution_status(
mut self,
execution_status: SqlDdlExecutionStatus,
) -> Self {
self.execution_status = execution_status;
self
}
pub(in crate::db) const fn with_execution_metrics(
mut self,
rows_scanned: usize,
index_keys_written: usize,
) -> Self {
self.rows_scanned = rows_scanned;
self.index_keys_written = index_keys_written;
self
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SqlDdlMutationKind {
AddFieldPathIndex,
AddExpressionIndex,
DropSecondaryIndex,
}
impl SqlDdlMutationKind {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::AddFieldPathIndex => "add_field_path_index",
Self::AddExpressionIndex => "add_expression_index",
Self::DropSecondaryIndex => "drop_secondary_index",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SqlDdlExecutionStatus {
PreparedOnly,
Published,
NoOp,
}
impl SqlDdlExecutionStatus {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::PreparedOnly => "prepared_only",
Self::Published => "published",
Self::NoOp => "no_op",
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct BoundSqlDdlRequest {
statement: BoundSqlDdlStatement,
}
impl BoundSqlDdlRequest {
#[must_use]
pub(in crate::db) const fn statement(&self) -> &BoundSqlDdlStatement {
&self.statement
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) enum BoundSqlDdlStatement {
CreateIndex(BoundSqlCreateIndexRequest),
DropIndex(BoundSqlDropIndexRequest),
NoOp(BoundSqlDdlNoOpRequest),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct BoundSqlDdlNoOpRequest {
mutation_kind: SqlDdlMutationKind,
index_name: String,
entity_name: String,
target_store: String,
field_path: Vec<String>,
}
impl BoundSqlDdlNoOpRequest {
#[must_use]
pub(in crate::db) const fn mutation_kind(&self) -> SqlDdlMutationKind {
self.mutation_kind
}
#[must_use]
pub(in crate::db) const fn index_name(&self) -> &str {
self.index_name.as_str()
}
#[must_use]
pub(in crate::db) const fn entity_name(&self) -> &str {
self.entity_name.as_str()
}
#[must_use]
pub(in crate::db) const fn target_store(&self) -> &str {
self.target_store.as_str()
}
#[must_use]
pub(in crate::db) const fn field_path(&self) -> &[String] {
self.field_path.as_slice()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct BoundSqlCreateIndexRequest {
index_name: String,
entity_name: String,
key_items: Vec<BoundSqlDdlCreateIndexKey>,
field_paths: Vec<BoundSqlDdlFieldPath>,
candidate_index: PersistedIndexSnapshot,
}
impl BoundSqlCreateIndexRequest {
#[must_use]
pub(in crate::db) const fn index_name(&self) -> &str {
self.index_name.as_str()
}
#[must_use]
pub(in crate::db) const fn entity_name(&self) -> &str {
self.entity_name.as_str()
}
#[must_use]
pub(in crate::db) const fn field_paths(&self) -> &[BoundSqlDdlFieldPath] {
self.field_paths.as_slice()
}
#[must_use]
pub(in crate::db) const fn key_items(&self) -> &[BoundSqlDdlCreateIndexKey] {
self.key_items.as_slice()
}
#[must_use]
pub(in crate::db) const fn candidate_index(&self) -> &PersistedIndexSnapshot {
&self.candidate_index
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct BoundSqlDropIndexRequest {
index_name: String,
entity_name: String,
dropped_index: PersistedIndexSnapshot,
field_path: Vec<String>,
}
impl BoundSqlDropIndexRequest {
#[must_use]
pub(in crate::db) const fn index_name(&self) -> &str {
self.index_name.as_str()
}
#[must_use]
pub(in crate::db) const fn entity_name(&self) -> &str {
self.entity_name.as_str()
}
#[must_use]
pub(in crate::db) const fn dropped_index(&self) -> &PersistedIndexSnapshot {
&self.dropped_index
}
#[must_use]
pub(in crate::db) const fn field_path(&self) -> &[String] {
self.field_path.as_slice()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct BoundSqlDdlFieldPath {
root: String,
segments: Vec<String>,
accepted_path: Vec<String>,
}
impl BoundSqlDdlFieldPath {
#[must_use]
pub(in crate::db) const fn root(&self) -> &str {
self.root.as_str()
}
#[must_use]
pub(in crate::db) const fn segments(&self) -> &[String] {
self.segments.as_slice()
}
#[must_use]
pub(in crate::db) const fn accepted_path(&self) -> &[String] {
self.accepted_path.as_slice()
}
}
#[derive(Debug, Eq, PartialEq, ThisError)]
pub(in crate::db) enum SqlDdlBindError {
#[error("SQL DDL binder requires a DDL statement")]
NotDdl,
#[error("accepted schema does not expose an entity name")]
MissingEntityName,
#[error("accepted schema does not expose an entity path")]
MissingEntityPath,
#[error("SQL entity '{sql_entity}' does not match accepted entity '{expected_entity}'")]
EntityMismatch {
sql_entity: String,
expected_entity: String,
},
#[error("unknown field path '{field_path}' for accepted entity '{entity_name}'")]
UnknownFieldPath {
entity_name: String,
field_path: String,
},
#[error("field path '{field_path}' is not indexable")]
FieldPathNotIndexable { field_path: String },
#[error("field path '{field_path}' depends on generated-only metadata")]
FieldPathNotAcceptedCatalogBacked { field_path: String },
#[error("invalid filtered index predicate: {detail}")]
InvalidFilteredIndexPredicate { detail: String },
#[error("index name '{index_name}' already exists in the accepted schema")]
DuplicateIndexName { index_name: String },
#[error("accepted schema already has index '{existing_index}' for field path '{field_path}'")]
DuplicateFieldPathIndex {
field_path: String,
existing_index: String,
},
#[error("unknown index '{index_name}' for accepted entity '{entity_name}'")]
UnknownIndex {
entity_name: String,
index_name: String,
},
#[error(
"index '{index_name}' is generated by the entity model and cannot be dropped with SQL DDL; remove the index from the entity schema macro instead"
)]
GeneratedIndexDropRejected { index_name: String },
#[error(
"index '{index_name}' is not a supported DDL-droppable secondary index; SQL DDL can currently drop only indexes created through SQL DDL"
)]
UnsupportedDropIndex { index_name: String },
#[error(
"SQL DDL ALTER TABLE ADD COLUMN is not executable yet for accepted entity '{entity_name}' column '{column_name}'"
)]
UnsupportedAlterTableAddColumn {
entity_name: String,
column_name: String,
},
}
#[derive(Debug, Eq, PartialEq, ThisError)]
pub(in crate::db) enum SqlDdlLoweringError {
#[error("SQL DDL lowering requires a supported DDL statement")]
UnsupportedStatement,
#[error("schema mutation admission rejected DDL candidate: {0:?}")]
MutationAdmission(SchemaDdlMutationAdmissionError),
}
#[derive(Debug, Eq, PartialEq, ThisError)]
pub(in crate::db) enum SqlDdlPrepareError {
#[error("{0}")]
Bind(#[from] SqlDdlBindError),
#[error("{0}")]
Lowering(#[from] SqlDdlLoweringError),
}
pub(in crate::db) fn prepare_sql_ddl_statement(
statement: &SqlStatement,
accepted_before: &AcceptedSchemaSnapshot,
schema: &SchemaInfo,
index_store_path: &'static str,
) -> Result<PreparedSqlDdlCommand, SqlDdlPrepareError> {
let bound = bind_sql_ddl_statement(statement, accepted_before, schema, index_store_path)?;
let derivation = if matches!(bound.statement(), BoundSqlDdlStatement::NoOp(_)) {
None
} else {
Some(derive_bound_sql_ddl_accepted_after(
accepted_before,
&bound,
)?)
};
let report = ddl_preparation_report(&bound);
Ok(PreparedSqlDdlCommand {
bound,
derivation,
report,
})
}
pub(in crate::db) fn bind_sql_ddl_statement(
statement: &SqlStatement,
accepted_before: &AcceptedSchemaSnapshot,
schema: &SchemaInfo,
index_store_path: &'static str,
) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
let SqlStatement::Ddl(ddl) = statement else {
return Err(SqlDdlBindError::NotDdl);
};
match ddl {
SqlDdlStatement::CreateIndex(statement) => {
bind_create_index_statement(statement, schema, index_store_path)
}
SqlDdlStatement::DropIndex(statement) => {
bind_drop_index_statement(statement, accepted_before, schema)
}
SqlDdlStatement::AlterTableAddColumn(statement) => {
bind_alter_table_add_column_statement(statement, schema)
}
}
}
fn bind_create_index_statement(
statement: &SqlCreateIndexStatement,
schema: &SchemaInfo,
index_store_path: &'static str,
) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
let entity_name = schema
.entity_name()
.ok_or(SqlDdlBindError::MissingEntityName)?;
if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
return Err(SqlDdlBindError::EntityMismatch {
sql_entity: statement.entity.clone(),
expected_entity: entity_name.to_string(),
});
}
let key_items = statement
.key_items
.iter()
.map(|key_item| bind_create_index_key_item(key_item, entity_name, schema))
.collect::<Result<Vec<_>, _>>()?;
let field_paths = create_index_field_path_report_items(key_items.as_slice());
if let Some(existing_index) = find_field_path_index_by_name(schema, statement.name.as_str()) {
if key_items_are_field_path_only(key_items.as_slice())
&& statement.if_not_exists
&& existing_field_path_index_matches_request(
existing_index,
field_paths.as_slice(),
statement.predicate_sql.as_deref(),
statement.uniqueness,
)
{
return Ok(BoundSqlDdlRequest {
statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
mutation_kind: SqlDdlMutationKind::AddFieldPathIndex,
index_name: statement.name.clone(),
entity_name: entity_name.to_string(),
target_store: existing_index.store().to_string(),
field_path: ddl_field_path_report(field_paths.as_slice()),
}),
});
}
return Err(SqlDdlBindError::DuplicateIndexName {
index_name: statement.name.clone(),
});
}
let predicate_sql =
validated_create_index_predicate_sql(statement.predicate_sql.as_deref(), schema)?;
if let Some(existing_index) = find_expression_index_by_name(schema, statement.name.as_str()) {
if statement.if_not_exists
&& existing_expression_index_matches_request(
existing_index,
key_items.as_slice(),
predicate_sql.as_deref(),
statement.uniqueness,
)
{
return Ok(BoundSqlDdlRequest {
statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
mutation_kind: SqlDdlMutationKind::AddExpressionIndex,
index_name: statement.name.clone(),
entity_name: entity_name.to_string(),
target_store: existing_index.store().to_string(),
field_path: ddl_key_item_report(key_items.as_slice()),
}),
});
}
return Err(SqlDdlBindError::DuplicateIndexName {
index_name: statement.name.clone(),
});
}
if key_items_are_field_path_only(key_items.as_slice()) {
reject_duplicate_field_path_index(
field_paths.as_slice(),
predicate_sql.as_deref(),
schema,
)?;
} else {
reject_duplicate_expression_index(key_items.as_slice(), predicate_sql.as_deref(), schema)?;
}
let candidate_index = candidate_index_snapshot(
statement.name.as_str(),
key_items.as_slice(),
predicate_sql.as_deref(),
statement.uniqueness,
schema,
index_store_path,
)?;
Ok(BoundSqlDdlRequest {
statement: BoundSqlDdlStatement::CreateIndex(BoundSqlCreateIndexRequest {
index_name: statement.name.clone(),
entity_name: entity_name.to_string(),
key_items,
field_paths,
candidate_index,
}),
})
}
fn bind_drop_index_statement(
statement: &SqlDropIndexStatement,
accepted_before: &AcceptedSchemaSnapshot,
schema: &SchemaInfo,
) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
let entity_name = schema
.entity_name()
.ok_or(SqlDdlBindError::MissingEntityName)?;
if let Some(sql_entity) = statement.entity.as_deref()
&& !identifiers_tail_match(sql_entity, entity_name)
{
return Err(SqlDdlBindError::EntityMismatch {
sql_entity: sql_entity.to_string(),
expected_entity: entity_name.to_string(),
});
}
let drop_candidate = resolve_sql_ddl_secondary_index_drop_candidate(
accepted_before,
&statement.name,
)
.map_err(|error| match error {
SchemaDdlIndexDropCandidateError::Generated => {
SqlDdlBindError::GeneratedIndexDropRejected {
index_name: statement.name.clone(),
}
}
SchemaDdlIndexDropCandidateError::Unknown => SqlDdlBindError::UnknownIndex {
entity_name: entity_name.to_string(),
index_name: statement.name.clone(),
},
SchemaDdlIndexDropCandidateError::Unsupported => SqlDdlBindError::UnsupportedDropIndex {
index_name: statement.name.clone(),
},
});
let (dropped_index, field_path) = match drop_candidate {
Ok((dropped_index, field_path)) => (dropped_index, field_path),
Err(SqlDdlBindError::UnknownIndex { .. }) if statement.if_exists => {
return Ok(BoundSqlDdlRequest {
statement: BoundSqlDdlStatement::NoOp(BoundSqlDdlNoOpRequest {
mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
index_name: statement.name.clone(),
entity_name: entity_name.to_string(),
target_store: "-".to_string(),
field_path: Vec::new(),
}),
});
}
Err(error) => return Err(error),
};
Ok(BoundSqlDdlRequest {
statement: BoundSqlDdlStatement::DropIndex(BoundSqlDropIndexRequest {
index_name: statement.name.clone(),
entity_name: entity_name.to_string(),
dropped_index,
field_path,
}),
})
}
fn bind_alter_table_add_column_statement(
statement: &SqlAlterTableAddColumnStatement,
schema: &SchemaInfo,
) -> Result<BoundSqlDdlRequest, SqlDdlBindError> {
let entity_name = schema
.entity_name()
.ok_or(SqlDdlBindError::MissingEntityName)?;
if !identifiers_tail_match(statement.entity.as_str(), entity_name) {
return Err(SqlDdlBindError::EntityMismatch {
sql_entity: statement.entity.clone(),
expected_entity: entity_name.to_string(),
});
}
Err(SqlDdlBindError::UnsupportedAlterTableAddColumn {
entity_name: entity_name.to_string(),
column_name: statement.column_name.clone(),
})
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) enum BoundSqlDdlCreateIndexKey {
FieldPath(BoundSqlDdlFieldPath),
Expression(BoundSqlDdlExpressionKey),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct BoundSqlDdlExpressionKey {
op: PersistedIndexExpressionOp,
source: BoundSqlDdlFieldPath,
canonical_sql: String,
}
impl BoundSqlDdlExpressionKey {
#[must_use]
pub(in crate::db) const fn op(&self) -> PersistedIndexExpressionOp {
self.op
}
#[must_use]
pub(in crate::db) const fn source(&self) -> &BoundSqlDdlFieldPath {
&self.source
}
#[must_use]
pub(in crate::db) const fn canonical_sql(&self) -> &str {
self.canonical_sql.as_str()
}
}
fn bind_create_index_key_item(
key_item: &SqlCreateIndexKeyItem,
entity_name: &str,
schema: &SchemaInfo,
) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
match key_item {
SqlCreateIndexKeyItem::FieldPath(field_path) => {
bind_create_index_field_path(field_path.as_str(), entity_name, schema)
.map(BoundSqlDdlCreateIndexKey::FieldPath)
}
SqlCreateIndexKeyItem::Expression(expression) => {
bind_create_index_expression_key(expression, entity_name, schema)
}
}
}
fn bind_create_index_expression_key(
expression: &SqlCreateIndexExpressionKey,
entity_name: &str,
schema: &SchemaInfo,
) -> Result<BoundSqlDdlCreateIndexKey, SqlDdlBindError> {
let source = bind_create_index_field_path(expression.field_path.as_str(), entity_name, schema)?;
Ok(BoundSqlDdlCreateIndexKey::Expression(
BoundSqlDdlExpressionKey {
op: expression_op_from_sql_function(expression.function),
source,
canonical_sql: expression.canonical_sql(),
},
))
}
const fn expression_op_from_sql_function(
function: crate::db::sql::parser::SqlCreateIndexExpressionFunction,
) -> PersistedIndexExpressionOp {
match function {
crate::db::sql::parser::SqlCreateIndexExpressionFunction::Lower => {
PersistedIndexExpressionOp::Lower
}
crate::db::sql::parser::SqlCreateIndexExpressionFunction::Upper => {
PersistedIndexExpressionOp::Upper
}
crate::db::sql::parser::SqlCreateIndexExpressionFunction::Trim => {
PersistedIndexExpressionOp::Trim
}
}
}
fn key_items_are_field_path_only(key_items: &[BoundSqlDdlCreateIndexKey]) -> bool {
key_items
.iter()
.all(|key_item| matches!(key_item, BoundSqlDdlCreateIndexKey::FieldPath(_)))
}
fn create_index_field_path_report_items(
key_items: &[BoundSqlDdlCreateIndexKey],
) -> Vec<BoundSqlDdlFieldPath> {
key_items
.iter()
.map(|key_item| match key_item {
BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.clone(),
BoundSqlDdlCreateIndexKey::Expression(expression) => expression.source().clone(),
})
.collect()
}
fn bind_create_index_field_path(
field_path: &str,
entity_name: &str,
schema: &SchemaInfo,
) -> Result<BoundSqlDdlFieldPath, SqlDdlBindError> {
let mut path = field_path
.split('.')
.map(str::trim)
.filter(|segment| !segment.is_empty());
let Some(root) = path.next() else {
return Err(SqlDdlBindError::UnknownFieldPath {
entity_name: entity_name.to_string(),
field_path: field_path.to_string(),
});
};
let segments = path.map(str::to_string).collect::<Vec<_>>();
let capabilities = if segments.is_empty() {
schema.sql_capabilities(root)
} else {
schema.nested_sql_capabilities(root, segments.as_slice())
}
.ok_or_else(|| SqlDdlBindError::UnknownFieldPath {
entity_name: entity_name.to_string(),
field_path: field_path.to_string(),
})?;
if !capabilities.orderable() {
return Err(SqlDdlBindError::FieldPathNotIndexable {
field_path: field_path.to_string(),
});
}
let mut accepted_path = Vec::with_capacity(segments.len() + 1);
accepted_path.push(root.to_string());
accepted_path.extend(segments.iter().cloned());
Ok(BoundSqlDdlFieldPath {
root: root.to_string(),
segments,
accepted_path,
})
}
fn find_field_path_index_by_name<'a>(
schema: &'a SchemaInfo,
index_name: &str,
) -> Option<&'a crate::db::schema::SchemaIndexInfo> {
schema
.field_path_indexes()
.iter()
.find(|index| index.name() == index_name)
}
fn existing_field_path_index_matches_request(
index: &crate::db::schema::SchemaIndexInfo,
field_paths: &[BoundSqlDdlFieldPath],
predicate_sql: Option<&str>,
uniqueness: SqlCreateIndexUniqueness,
) -> bool {
let fields = index.fields();
index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
&& index.predicate_sql() == predicate_sql
&& fields.len() == field_paths.len()
&& fields
.iter()
.zip(field_paths)
.all(|(field, requested)| field.path() == requested.accepted_path())
}
fn find_expression_index_by_name<'a>(
schema: &'a SchemaInfo,
index_name: &str,
) -> Option<&'a SchemaExpressionIndexInfo> {
schema
.expression_indexes()
.iter()
.find(|index| index.name() == index_name)
}
fn existing_expression_index_matches_request(
index: &SchemaExpressionIndexInfo,
key_items: &[BoundSqlDdlCreateIndexKey],
predicate_sql: Option<&str>,
uniqueness: SqlCreateIndexUniqueness,
) -> bool {
let existing_key_items = index.key_items();
index.unique() == matches!(uniqueness, SqlCreateIndexUniqueness::Unique)
&& index.predicate_sql() == predicate_sql
&& existing_key_items.len() == key_items.len()
&& existing_key_items
.iter()
.zip(key_items)
.all(existing_expression_key_item_matches_request)
}
fn existing_expression_key_item_matches_request(
existing: (
&SchemaExpressionIndexKeyItemInfo,
&BoundSqlDdlCreateIndexKey,
),
) -> bool {
let (existing, requested) = existing;
match (existing, requested) {
(
SchemaExpressionIndexKeyItemInfo::FieldPath(existing),
BoundSqlDdlCreateIndexKey::FieldPath(requested),
) => existing.path() == requested.accepted_path(),
(
SchemaExpressionIndexKeyItemInfo::Expression(existing),
BoundSqlDdlCreateIndexKey::Expression(requested),
) => existing_expression_component_matches_request(
existing.op(),
existing.source().path(),
existing.canonical_text(),
requested,
),
_ => false,
}
}
fn existing_expression_component_matches_request(
existing_op: PersistedIndexExpressionOp,
existing_path: &[String],
existing_canonical_text: &str,
requested: &BoundSqlDdlExpressionKey,
) -> bool {
let requested_path = requested.source().accepted_path();
let requested_canonical_text = format!("expr:v1:{}", requested.canonical_sql());
existing_op == requested.op()
&& existing_path == requested_path
&& existing_canonical_text == requested_canonical_text
}
fn reject_duplicate_expression_index(
key_items: &[BoundSqlDdlCreateIndexKey],
predicate_sql: Option<&str>,
schema: &SchemaInfo,
) -> Result<(), SqlDdlBindError> {
let Some(existing_index) = schema.expression_indexes().iter().find(|index| {
existing_expression_index_matches_request(
index,
key_items,
predicate_sql,
if index.unique() {
SqlCreateIndexUniqueness::Unique
} else {
SqlCreateIndexUniqueness::NonUnique
},
)
}) else {
return Ok(());
};
Err(SqlDdlBindError::DuplicateFieldPathIndex {
field_path: ddl_key_item_report(key_items).join(","),
existing_index: existing_index.name().to_string(),
})
}
fn reject_duplicate_field_path_index(
field_paths: &[BoundSqlDdlFieldPath],
predicate_sql: Option<&str>,
schema: &SchemaInfo,
) -> Result<(), SqlDdlBindError> {
let Some(existing_index) = schema.field_path_indexes().iter().find(|index| {
let fields = index.fields();
index.predicate_sql() == predicate_sql
&& fields.len() == field_paths.len()
&& fields
.iter()
.zip(field_paths)
.all(|(field, requested)| field.path() == requested.accepted_path())
}) else {
return Ok(());
};
Err(SqlDdlBindError::DuplicateFieldPathIndex {
field_path: ddl_field_path_report(field_paths).join(","),
existing_index: existing_index.name().to_string(),
})
}
fn candidate_index_snapshot(
index_name: &str,
key_items: &[BoundSqlDdlCreateIndexKey],
predicate_sql: Option<&str>,
uniqueness: SqlCreateIndexUniqueness,
schema: &SchemaInfo,
index_store_path: &'static str,
) -> Result<PersistedIndexSnapshot, SqlDdlBindError> {
let key = if key_items_are_field_path_only(key_items) {
PersistedIndexKeySnapshot::FieldPath(
key_items
.iter()
.map(|key_item| {
let BoundSqlDdlCreateIndexKey::FieldPath(field_path) = key_item else {
unreachable!("field-path-only index checked before field-path lowering");
};
accepted_index_field_path_snapshot(schema, field_path)
})
.collect::<Result<Vec<_>, _>>()?,
)
} else {
PersistedIndexKeySnapshot::Items(
key_items
.iter()
.map(|key_item| match key_item {
BoundSqlDdlCreateIndexKey::FieldPath(field_path) => {
accepted_index_field_path_snapshot(schema, field_path)
.map(PersistedIndexKeyItemSnapshot::FieldPath)
}
BoundSqlDdlCreateIndexKey::Expression(expression) => {
accepted_index_expression_snapshot(schema, expression)
}
})
.collect::<Result<Vec<_>, _>>()?,
)
};
Ok(PersistedIndexSnapshot::new_sql_ddl(
schema.next_secondary_index_ordinal(),
index_name.to_string(),
index_store_path.to_string(),
matches!(uniqueness, SqlCreateIndexUniqueness::Unique),
key,
predicate_sql.map(str::to_string),
))
}
fn accepted_index_field_path_snapshot(
schema: &SchemaInfo,
field_path: &BoundSqlDdlFieldPath,
) -> Result<crate::db::schema::PersistedIndexFieldPathSnapshot, SqlDdlBindError> {
schema
.accepted_index_field_path_snapshot(field_path.root(), field_path.segments())
.ok_or_else(|| SqlDdlBindError::FieldPathNotAcceptedCatalogBacked {
field_path: field_path.accepted_path().join("."),
})
}
fn accepted_index_expression_snapshot(
schema: &SchemaInfo,
expression: &BoundSqlDdlExpressionKey,
) -> Result<PersistedIndexKeyItemSnapshot, SqlDdlBindError> {
let source = accepted_index_field_path_snapshot(schema, expression.source())?;
let Some(output_kind) = expression_output_kind(expression.op(), source.kind()) else {
return Err(SqlDdlBindError::FieldPathNotIndexable {
field_path: expression.source().accepted_path().join("."),
});
};
Ok(PersistedIndexKeyItemSnapshot::Expression(Box::new(
PersistedIndexExpressionSnapshot::new(
expression.op(),
source.clone(),
source.kind().clone(),
output_kind,
format!("expr:v1:{}", expression.canonical_sql()),
),
)))
}
fn expression_output_kind(
op: PersistedIndexExpressionOp,
source_kind: &PersistedFieldKind,
) -> Option<PersistedFieldKind> {
match op {
PersistedIndexExpressionOp::Lower
| PersistedIndexExpressionOp::Upper
| PersistedIndexExpressionOp::Trim
| PersistedIndexExpressionOp::LowerTrim => {
if matches!(source_kind, PersistedFieldKind::Text { .. }) {
Some(source_kind.clone())
} else {
None
}
}
PersistedIndexExpressionOp::Date => {
if matches!(
source_kind,
PersistedFieldKind::Date | PersistedFieldKind::Timestamp
) {
Some(PersistedFieldKind::Date)
} else {
None
}
}
PersistedIndexExpressionOp::Year
| PersistedIndexExpressionOp::Month
| PersistedIndexExpressionOp::Day => {
if matches!(
source_kind,
PersistedFieldKind::Date | PersistedFieldKind::Timestamp
) {
Some(PersistedFieldKind::Int)
} else {
None
}
}
}
}
fn validated_create_index_predicate_sql(
predicate_sql: Option<&str>,
schema: &SchemaInfo,
) -> Result<Option<String>, SqlDdlBindError> {
let Some(predicate_sql) = predicate_sql else {
return Ok(None);
};
let predicate = parse_sql_predicate(predicate_sql).map_err(|error| {
SqlDdlBindError::InvalidFilteredIndexPredicate {
detail: error.to_string(),
}
})?;
validate_predicate(schema, &predicate).map_err(|error| {
SqlDdlBindError::InvalidFilteredIndexPredicate {
detail: error.to_string(),
}
})?;
Ok(Some(predicate_sql.to_string()))
}
fn ddl_field_path_report(field_paths: &[BoundSqlDdlFieldPath]) -> Vec<String> {
match field_paths {
[field_path] => field_path.accepted_path().to_vec(),
_ => vec![
field_paths
.iter()
.map(|field_path| field_path.accepted_path().join("."))
.collect::<Vec<_>>()
.join(","),
],
}
}
fn ddl_key_item_report(key_items: &[BoundSqlDdlCreateIndexKey]) -> Vec<String> {
match key_items {
[key_item] => vec![ddl_key_item_text(key_item)],
_ => vec![
key_items
.iter()
.map(ddl_key_item_text)
.collect::<Vec<_>>()
.join(","),
],
}
}
fn ddl_key_item_text(key_item: &BoundSqlDdlCreateIndexKey) -> String {
match key_item {
BoundSqlDdlCreateIndexKey::FieldPath(field_path) => field_path.accepted_path().join("."),
BoundSqlDdlCreateIndexKey::Expression(expression) => expression.canonical_sql().to_string(),
}
}
pub(in crate::db) fn lower_bound_sql_ddl_to_schema_mutation_admission(
request: &BoundSqlDdlRequest,
) -> Result<SchemaDdlMutationAdmission, SqlDdlLoweringError> {
match request.statement() {
BoundSqlDdlStatement::CreateIndex(create) => {
if create.candidate_index().key().is_field_path_only() {
admit_sql_ddl_field_path_index_candidate(create.candidate_index())
} else {
admit_sql_ddl_expression_index_candidate(create.candidate_index())
}
}
BoundSqlDdlStatement::DropIndex(drop) => {
admit_sql_ddl_secondary_index_drop_candidate(drop.dropped_index())
}
BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
}
.map_err(SqlDdlLoweringError::MutationAdmission)
}
pub(in crate::db) fn derive_bound_sql_ddl_accepted_after(
accepted_before: &AcceptedSchemaSnapshot,
request: &BoundSqlDdlRequest,
) -> Result<SchemaDdlAcceptedSnapshotDerivation, SqlDdlLoweringError> {
match request.statement() {
BoundSqlDdlStatement::CreateIndex(create) => {
if create.candidate_index().key().is_field_path_only() {
derive_sql_ddl_field_path_index_accepted_after(
accepted_before,
create.candidate_index().clone(),
)
} else {
derive_sql_ddl_expression_index_accepted_after(
accepted_before,
create.candidate_index().clone(),
)
}
}
BoundSqlDdlStatement::DropIndex(drop) => {
derive_sql_ddl_secondary_index_drop_accepted_after(
accepted_before,
drop.dropped_index(),
)
}
BoundSqlDdlStatement::NoOp(_) => return Err(SqlDdlLoweringError::UnsupportedStatement),
}
.map_err(SqlDdlLoweringError::MutationAdmission)
}
fn ddl_preparation_report(bound: &BoundSqlDdlRequest) -> SqlDdlPreparationReport {
match bound.statement() {
BoundSqlDdlStatement::CreateIndex(create) => {
let target = create.candidate_index();
SqlDdlPreparationReport {
mutation_kind: if target.key().is_field_path_only() {
SqlDdlMutationKind::AddFieldPathIndex
} else {
SqlDdlMutationKind::AddExpressionIndex
},
target_index: target.name().to_string(),
target_store: target.store().to_string(),
field_path: ddl_key_item_report(create.key_items()),
execution_status: SqlDdlExecutionStatus::PreparedOnly,
rows_scanned: 0,
index_keys_written: 0,
}
}
BoundSqlDdlStatement::DropIndex(drop) => SqlDdlPreparationReport {
mutation_kind: SqlDdlMutationKind::DropSecondaryIndex,
target_index: drop.index_name().to_string(),
target_store: drop.dropped_index().store().to_string(),
field_path: drop.field_path().to_vec(),
execution_status: SqlDdlExecutionStatus::PreparedOnly,
rows_scanned: 0,
index_keys_written: 0,
},
BoundSqlDdlStatement::NoOp(no_op) => SqlDdlPreparationReport {
mutation_kind: no_op.mutation_kind(),
target_index: no_op.index_name().to_string(),
target_store: no_op.target_store().to_string(),
field_path: no_op.field_path().to_vec(),
execution_status: SqlDdlExecutionStatus::PreparedOnly,
rows_scanned: 0,
index_keys_written: 0,
},
}
}