use sea_query::{Alias, Index};
use vespertide_core::{KeepPolicy, TableConstraint, TableDef, UniqueConstraintStrategy};
use super::super::helpers::{build_unique_constraint_name, quote_ident};
use super::super::types::{BuiltQuery, DatabaseBackend, RawSql};
use crate::error::QueryError;
pub(super) fn build_unique<T: AsRef<str>>(
backend: DatabaseBackend,
table: &str,
name: Option<&str>,
columns: &[T],
strategy: &UniqueConstraintStrategy,
current_schema: &[TableDef],
) -> Result<Vec<BuiltQuery>, QueryError> {
let mut queries = build_pre_cleanup_delete(backend, table, columns, strategy, current_schema)?;
queries.push(build_unique_index(table, name, columns));
Ok(queries)
}
fn build_pre_cleanup_delete<T: AsRef<str>>(
backend: DatabaseBackend,
table: &str,
columns: &[T],
strategy: &UniqueConstraintStrategy,
current_schema: &[TableDef],
) -> Result<Vec<BuiltQuery>, QueryError> {
let keep = match strategy {
UniqueConstraintStrategy::DeleteDuplicates { keep } => *keep,
#[cfg(not(tarpaulin_include))]
_ => {
return Err(QueryError::UnsupportedAction(format!(
"AddConstraint(Unique) on '{table}': unsupported strategy variant"
)));
}
};
let Some(pk_column) = try_resolve_single_pk_column(table, current_schema, columns) else {
return Ok(vec![]);
};
let agg = match keep {
KeepPolicy::First => "MIN",
KeepPolicy::Last => "MAX",
};
let quoted_table = quote_ident(table, backend);
let quoted_pk = quote_ident(&pk_column, backend);
let quoted_unique_cols: Vec<String> = columns
.iter()
.map(|c| quote_ident(c.as_ref(), backend))
.collect();
let group_by = quoted_unique_cols.join(", ");
let sql = format!(
"DELETE FROM {quoted_table} WHERE {quoted_pk} NOT IN (\
SELECT {agg}({quoted_pk}) FROM {quoted_table} GROUP BY {group_by})",
);
Ok(vec![BuiltQuery::Raw(RawSql::uniform(sql))])
}
fn try_resolve_single_pk_column<T: AsRef<str>>(
table: &str,
current_schema: &[TableDef],
unique_columns: &[T],
) -> Option<String> {
let table_def = current_schema.iter().find(|t| t.name.as_str() == table)?;
let pk_columns: Vec<String> = table_def
.constraints
.iter()
.find_map(|c| {
if let TableConstraint::PrimaryKey { columns, .. } = c {
Some(columns.iter().map(ToString::to_string).collect())
} else {
None
}
})
.or_else(|| {
let inline: Vec<String> = table_def
.columns
.iter()
.filter(|col| col.primary_key.is_some())
.map(|col| col.name.to_string())
.collect();
if inline.is_empty() {
None
} else {
Some(inline)
}
})?;
if pk_columns.len() != 1 {
return None;
}
let pk_column = pk_columns.into_iter().next().expect("len == 1");
let unique_set: Vec<&str> = unique_columns.iter().map(AsRef::as_ref).collect();
if unique_set.iter().any(|c| *c == pk_column) {
return None;
}
Some(pk_column)
}
fn build_unique_index<T: AsRef<str>>(table: &str, name: Option<&str>, columns: &[T]) -> BuiltQuery {
let index_name = build_unique_constraint_name(table, columns, name);
let mut idx = Index::create()
.table(Alias::new(table))
.name(&index_name)
.unique()
.to_owned();
for col in columns {
idx.col(Alias::new(col.as_ref()));
}
BuiltQuery::CreateIndex(Box::new(idx))
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use vespertide_core::{ColumnDef, ColumnType, SimpleColumnType};
fn schema_with_single_pk() -> Vec<TableDef> {
let mut id_col = ColumnDef::new("id", ColumnType::Simple(SimpleColumnType::Integer), false);
id_col.primary_key =
Some(vespertide_core::schema::primary_key::PrimaryKeySyntax::Bool(true));
vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
id_col,
ColumnDef::new("email", ColumnType::Simple(SimpleColumnType::Text), false),
],
constraints: vec![],
}]
}
#[rstest]
#[case::keep_first(KeepPolicy::First, "MIN")]
#[case::keep_last(KeepPolicy::Last, "MAX")]
fn build_pre_cleanup_delete_emits_pk_keyed_delete(
#[case] keep: KeepPolicy,
#[case] expected_agg: &str,
) {
let schema = schema_with_single_pk();
let queries = build_pre_cleanup_delete(
DatabaseBackend::Postgres,
"users",
&["email"],
&UniqueConstraintStrategy::DeleteDuplicates { keep },
&schema,
)
.expect("body should succeed");
assert_eq!(queries.len(), 1);
let sql = queries[0].build(DatabaseBackend::Postgres);
assert!(sql.contains("DELETE FROM"));
assert!(sql.contains(expected_agg));
assert!(sql.contains("\"id\""));
assert!(sql.contains("\"email\""));
}
#[test]
fn build_pre_cleanup_delete_no_pk_returns_empty() {
let queries = build_pre_cleanup_delete(
DatabaseBackend::Postgres,
"nonexistent",
&["email"],
&UniqueConstraintStrategy::DeleteDuplicates {
keep: KeepPolicy::First,
},
&[],
)
.expect("None pk → Ok(empty)");
assert!(queries.is_empty());
}
#[test]
fn try_resolve_single_pk_column_returns_none_for_composite_pk() {
let schema = vec![TableDef {
name: "events".into(),
description: None,
columns: vec![
ColumnDef::new(
"tenant_id",
ColumnType::Simple(SimpleColumnType::Integer),
false,
),
ColumnDef::new("ts", ColumnType::Simple(SimpleColumnType::BigInt), false),
],
constraints: vec![TableConstraint::PrimaryKey {
auto_increment: false,
columns: vec!["tenant_id".into(), "ts".into()],
strategy: vespertide_core::PrimaryKeyAdditionStrategy::default(),
}],
}];
let resolved = try_resolve_single_pk_column("events", &schema, &["new_col"]);
assert!(resolved.is_none());
}
#[test]
fn try_resolve_single_pk_column_returns_none_when_pk_in_unique_set() {
let schema = schema_with_single_pk();
let resolved = try_resolve_single_pk_column("users", &schema, &["id", "email"]);
assert!(resolved.is_none());
}
}