use sqlx::{MySql, Pool, Postgres, Row, Sqlite};
pub(crate) async fn validate_postgres_table_schema<E>(
pool: &Pool<Postgres>,
table_name: &str,
expected_columns: &[(&str, &str)],
error_mapper: impl Fn(String) -> E,
) -> Result<(), E> {
let table_exists: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)",
)
.bind(table_name)
.fetch_one(pool)
.await
.map_err(|e| error_mapper(e.to_string()))?;
if !table_exists {
return Err(error_mapper(format!(
"Schema validation failed: Table '{table_name}' does not exist"
)));
}
let rows = sqlx::query(
"SELECT column_name, data_type FROM information_schema.columns
WHERE table_name = $1 ORDER BY column_name",
)
.bind(table_name)
.fetch_all(pool)
.await
.map_err(|e| error_mapper(e.to_string()))?;
let actual_columns: Vec<(String, String)> = rows
.iter()
.map(|row| {
let name: String = row.get("column_name");
let type_: String = row.get("data_type");
(name, type_)
})
.collect();
for (expected_name, expected_type) in expected_columns {
let found = actual_columns
.iter()
.find(|(name, _)| name == expected_name);
match found {
Some((_, actual_type)) if actual_type == expected_type => {
}
Some((_, actual_type)) => {
return Err(error_mapper(format!(
"Schema validation failed: Column '{expected_name}' has type '{actual_type}' but expected '{expected_type}'"
)));
}
None => {
return Err(error_mapper(format!(
"Schema validation failed: Missing column '{expected_name}'"
)));
}
}
}
for (actual_name, _) in &actual_columns {
if !expected_columns
.iter()
.any(|(name, _)| *name == actual_name)
{
tracing::warn!(
"Extra column '{}' found in table '{}'",
actual_name,
table_name
);
}
}
Ok(())
}
pub(crate) async fn validate_mysql_table_schema<E>(
pool: &Pool<MySql>,
table_name: &str,
expected_columns: &[(&str, &str)],
error_mapper: impl Fn(String) -> E,
) -> Result<(), E> {
let table_exists: bool = sqlx::query_scalar(
"SELECT COUNT(*) > 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ?",
)
.bind(table_name)
.fetch_one(pool)
.await
.map_err(|e| error_mapper(e.to_string()))?;
if !table_exists {
return Err(error_mapper(format!(
"Schema validation failed: Table '{table_name}' does not exist"
)));
}
let rows = sqlx::query(
"SELECT CAST(COLUMN_NAME AS CHAR) AS col_name, CAST(DATA_TYPE AS CHAR) AS data_type FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? ORDER BY COLUMN_NAME",
)
.bind(table_name)
.fetch_all(pool)
.await
.map_err(|e| error_mapper(e.to_string()))?;
let actual_columns: Vec<(String, String)> = rows
.iter()
.map(|row| {
let name: String = row.get("col_name");
let type_: String = row.get("data_type");
(name, type_)
})
.collect();
for (expected_name, expected_type) in expected_columns {
let found = actual_columns
.iter()
.find(|(name, _)| name == expected_name);
match found {
Some((_, actual_type)) if mysql_types_compatible(actual_type, expected_type) => {
}
Some((_, actual_type)) => {
return Err(error_mapper(format!(
"Schema validation failed: Column '{expected_name}' has type '{actual_type}' but expected '{expected_type}'"
)));
}
None => {
return Err(error_mapper(format!(
"Schema validation failed: Missing column '{expected_name}'"
)));
}
}
}
for (actual_name, _) in &actual_columns {
if !expected_columns
.iter()
.any(|(name, _)| *name == actual_name)
{
tracing::warn!(
"Extra column '{}' found in table '{}'",
actual_name,
table_name
);
}
}
Ok(())
}
pub(crate) async fn validate_sqlite_table_schema<E>(
pool: &Pool<Sqlite>,
table_name: &str,
expected_columns: &[(&str, &str)],
error_mapper: impl Fn(String) -> E,
) -> Result<(), E> {
let table_exists: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT name FROM sqlite_master WHERE type='table' AND name=?)",
)
.bind(table_name)
.fetch_one(pool)
.await
.map_err(|e| error_mapper(e.to_string()))?;
if !table_exists {
return Err(error_mapper(format!(
"Schema validation failed: Table '{table_name}' does not exist"
)));
}
let pragma_sql = format!("PRAGMA table_info('{table_name}');");
let rows = sqlx::query(pragma_sql.as_str())
.fetch_all(pool)
.await
.map_err(|e| error_mapper(e.to_string()))?;
let actual_columns: Vec<(String, String)> = rows
.iter()
.map(|row| {
let name: String = row.get("name");
let type_: String = row.get("type");
(name, type_)
})
.collect();
for (expected_name, expected_type) in expected_columns {
let found = actual_columns
.iter()
.find(|(name, _)| name == expected_name);
match found {
Some((_, actual_type)) if actual_type.to_uppercase() == *expected_type => {
}
Some((_, actual_type)) => {
return Err(error_mapper(format!(
"Schema validation failed: Column '{expected_name}' has type '{actual_type}' but expected '{expected_type}'"
)));
}
None => {
return Err(error_mapper(format!(
"Schema validation failed: Missing column '{expected_name}'."
)));
}
}
}
Ok(())
}
fn mysql_types_compatible(actual: &str, expected: &str) -> bool {
if actual == expected {
return true;
}
matches!(
(actual, expected),
("longtext", "json") | ("json", "longtext")
)
}