use crate::domain::table_diff::RowMap;
use crate::domain::value_objects::{ColumnName, Schema, TableName};
use crate::infrastructure::db::dialect::QueryDialect;
use serde_json::Value;
pub fn build_select_query(
schema: &Schema,
table: &TableName,
pk_cols: &[ColumnName],
dialect: &dyn QueryDialect,
) -> String {
let prefix = dialect.schema_prefix(&schema.0);
let table_q = dialect.quote_ident(&table.0);
let order_cols: Vec<String> = pk_cols.iter().map(|c| dialect.quote_ident(&c.0)).collect();
if order_cols.is_empty() {
format!("SELECT * FROM {}{}", prefix, table_q)
} else {
format!(
"SELECT * FROM {}{} ORDER BY {}",
prefix,
table_q,
order_cols.join(", ")
)
}
}
pub fn build_typed_select_query(
schema: &Schema,
table: &TableName,
pk_cols: &[ColumnName],
col_types: &[(String, String)],
dialect: &dyn QueryDialect,
) -> String {
let prefix = dialect.schema_prefix(&schema.0);
let table_q = dialect.quote_ident(&table.0);
let col_exprs: Vec<String> = col_types
.iter()
.map(|(col_name, data_type)| {
let q = dialect.quote_ident(col_name);
if dialect.is_native_type(data_type) {
q
} else {
dialect.cast_to_text(&q)
}
})
.collect();
let order_cols: Vec<String> = pk_cols.iter().map(|c| dialect.quote_ident(&c.0)).collect();
if order_cols.is_empty() {
format!("SELECT {} FROM {}{}", col_exprs.join(", "), prefix, table_q)
} else {
format!(
"SELECT {} FROM {}{} ORDER BY {}",
col_exprs.join(", "),
prefix,
table_q,
order_cols.join(", ")
)
}
}
pub fn pk_key(row: &RowMap, pk_cols: &[ColumnName]) -> String {
pk_cols
.iter()
.map(|col| {
row.get(&col.0)
.map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
})
.unwrap_or_else(|| "NULL".to_string())
})
.collect::<Vec<_>>()
.join("|")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::value_objects::{ColumnName, Schema, TableName};
use crate::infrastructure::db::dialect::{MysqlDialect, PostgresDialect, SqliteDialect};
fn pg() -> PostgresDialect {
PostgresDialect
}
fn my() -> MysqlDialect {
MysqlDialect
}
fn sq() -> SqliteDialect {
SqliteDialect
}
#[test]
fn test_build_select_query_postgres() {
let schema = Schema("sandbox".into());
let table = TableName("pricing_rules".into());
let pks = vec![ColumnName("id".into())];
let q = build_select_query(&schema, &table, &pks, &pg());
assert_eq!(
q,
r#"SELECT * FROM "sandbox"."pricing_rules" ORDER BY "id""#
);
}
#[test]
fn test_build_select_query_mysql() {
let schema = Schema("mydb".into());
let table = TableName("rules".into());
let pks = vec![ColumnName("id".into()), ColumnName("code".into())];
let q = build_select_query(&schema, &table, &pks, &my());
assert_eq!(q, "SELECT * FROM `mydb`.`rules` ORDER BY `id`, `code`");
}
#[test]
fn test_build_select_query_sqlite() {
let schema = Schema("ignored".into());
let table = TableName("rules".into());
let pks = vec![ColumnName("id".into())];
let q = build_select_query(&schema, &table, &pks, &sq());
assert_eq!(q, r#"SELECT * FROM "rules" ORDER BY "id""#);
}
#[test]
fn test_build_select_query_no_pk_omits_order_by() {
let schema = Schema("s".into());
let table = TableName("t".into());
let pks: Vec<ColumnName> = vec![];
let q = build_select_query(&schema, &table, &pks, &pg());
assert_eq!(q, r#"SELECT * FROM "s"."t""#);
assert!(!q.contains("ORDER BY"));
}
#[test]
fn test_build_typed_select_query_postgres_casts_non_primitives() {
let schema = Schema("sandbox".into());
let table = TableName("pricing_rules".into());
let pks = vec![ColumnName("id".into())];
let col_types = vec![
("id".to_string(), "integer".to_string()),
("name".to_string(), "character varying".to_string()),
("price".to_string(), "numeric".to_string()),
("uid".to_string(), "uuid".to_string()),
("active".to_string(), "boolean".to_string()),
];
let q = build_typed_select_query(&schema, &table, &pks, &col_types, &pg());
assert!(!q.contains(r#""id"::TEXT"#));
assert!(!q.contains(r#""active"::TEXT"#));
assert!(q.contains(r#""name"::TEXT"#));
assert!(q.contains(r#""price"::TEXT"#));
assert!(q.contains(r#""uid"::TEXT"#));
assert!(q.contains(r#"ORDER BY "id""#));
}
#[test]
fn test_build_typed_select_query_mysql_uses_convert() {
let schema = Schema("source_db".into());
let table = TableName("pricing_rules".into());
let pks = vec![ColumnName("id".into())];
let col_types = vec![
("id".to_string(), "int".to_string()),
("discount_rate".to_string(), "decimal".to_string()),
("is_active".to_string(), "tinyint".to_string()),
("metadata".to_string(), "json".to_string()),
];
let q = build_typed_select_query(&schema, &table, &pks, &col_types, &my());
assert!(!q.contains("CONVERT(`id`"), "int should not be cast");
assert!(
q.contains("CONVERT(`is_active` USING utf8mb4)"),
"tinyint should now be cast"
);
assert!(
q.contains("CONVERT(`discount_rate` USING utf8mb4)"),
"{}",
q
);
assert!(q.contains("CONVERT(`metadata` USING utf8mb4)"), "{}", q);
assert!(!q.contains("::TEXT"), "{}", q);
assert!(q.contains("ORDER BY `id`"));
}
#[test]
fn test_build_typed_select_query_array_gets_text_cast() {
let schema = Schema("s".into());
let table = TableName("t".into());
let pks = vec![ColumnName("id".into())];
let col_types = vec![
("id".to_string(), "integer".to_string()),
("tags".to_string(), "ARRAY".to_string()),
];
let q = build_typed_select_query(&schema, &table, &pks, &col_types, &pg());
assert!(q.contains(r#""tags"::TEXT"#), "{}", q);
assert!(!q.contains(r#""id"::TEXT"#), "{}", q);
}
#[test]
fn test_build_typed_select_query_no_pk_omits_order_by() {
let schema = Schema("s".into());
let table = TableName("t".into());
let pks: Vec<ColumnName> = vec![];
let col_types = vec![("id".to_string(), "integer".to_string())];
let q = build_typed_select_query(&schema, &table, &pks, &col_types, &pg());
assert!(!q.contains("ORDER BY"));
}
}