use crate::entity::{IEntitySnapshot, IEntityType, IGetKeyValues};
use crate::error::LrefResult;
use crate::metadata::EntityTypeMeta;
use crate::provider::{DbValue, IAsyncConnection, IDatabaseProvider};
use std::collections::HashMap;
pub struct ChangeExecutor;
impl ChangeExecutor {
pub async fn execute_inserts<E, F>(
conn: &mut dyn IAsyncConnection,
provider: &dyn IDatabaseProvider,
entities: &[(&E, &EntityTypeMeta)],
mut on_key_backfill: F,
) -> LrefResult<usize>
where
E: IEntityType + IEntitySnapshot + IGetKeyValues,
F: FnMut(usize, i64),
{
let gen = provider.sql_generator();
let mut inserted = 0;
for (idx, (entity, meta)) in entities.iter().enumerate() {
let snap = entity.snapshot();
let scalar_props: Vec<_> = meta.mapped_scalar_properties().collect();
if scalar_props.is_empty() {
continue;
}
let insert_cols: Vec<&str> = scalar_props
.iter()
.filter(|p| !p.is_auto_increment || !p.is_primary_key)
.map(|p| p.column_name.as_ref())
.collect();
let params: Vec<DbValue> = scalar_props
.iter()
.filter(|p| !p.is_auto_increment || !p.is_primary_key)
.map(|p| {
snap.get(p.field_name.as_ref())
.cloned()
.unwrap_or(DbValue::Null)
})
.collect();
if insert_cols.is_empty() {
continue;
}
let sql = gen.insert(meta.table_name.as_ref(), &insert_cols, true);
let rows = conn.execute(&sql, ¶ms).await?;
if rows > 0 {
on_key_backfill(idx, rows as i64);
inserted += 1;
}
}
Ok(inserted)
}
pub async fn execute_updates<E>(
conn: &mut dyn IAsyncConnection,
provider: &dyn IDatabaseProvider,
entities: &[(&E, &EntityTypeMeta)],
) -> LrefResult<usize>
where
E: IEntityType + IEntitySnapshot + IGetKeyValues,
{
let gen = provider.sql_generator();
let mut updated = 0;
for (_entity, meta) in entities {
let snap = _entity.snapshot();
let keys = _entity.key_values();
let scalar_props: Vec<_> = meta.mapped_scalar_properties().collect();
let set_cols: Vec<&str> = scalar_props
.iter()
.filter(|p| !p.is_primary_key)
.map(|p| p.column_name.as_ref())
.collect();
if set_cols.is_empty() || keys.is_empty() {
continue;
}
let where_parts: Vec<String> = keys
.keys()
.enumerate()
.map(|(i, k)| {
format!(
"{} = {}",
gen.quote_identifier(k),
gen.parameter_placeholder(i + 1)
)
})
.collect();
let where_clause = where_parts.join(" AND ");
let sql = gen.update(meta.table_name.as_ref(), &set_cols, &where_clause);
let mut params: Vec<DbValue> = set_cols
.iter()
.map(|col| {
let prop = scalar_props.iter().find(|p| p.column_name.as_ref() == *col);
match prop {
Some(p) => snap
.get(p.field_name.as_ref())
.cloned()
.unwrap_or(DbValue::Null),
None => DbValue::Null,
}
})
.collect();
for (_k, v) in &keys {
params.push(v.clone());
}
let rows = conn.execute(&sql, ¶ms).await?;
if rows > 0 {
updated += 1;
}
}
Ok(updated)
}
pub async fn execute_deletes<E>(
conn: &mut dyn IAsyncConnection,
provider: &dyn IDatabaseProvider,
entities: &[(&E, &EntityTypeMeta)],
) -> LrefResult<usize>
where
E: IEntityType + IGetKeyValues,
{
let gen = provider.sql_generator();
let mut deleted = 0;
for (_entity, meta) in entities {
let keys = _entity.key_values();
if keys.is_empty() {
continue;
}
let where_parts: Vec<String> = keys
.keys()
.enumerate()
.map(|(i, k)| {
format!(
"{} = {}",
gen.quote_identifier(k),
gen.parameter_placeholder(i + 1)
)
})
.collect();
let where_clause = where_parts.join(" AND ");
let sql = gen.delete(meta.table_name.as_ref(), &where_clause);
let params: Vec<DbValue> = keys.values().cloned().collect();
let rows = conn.execute(&sql, ¶ms).await?;
if rows > 0 {
deleted += 1;
}
}
Ok(deleted)
}
}
pub fn generate_insert_sql(
provider: &dyn IDatabaseProvider,
meta: &EntityTypeMeta,
_property_values: &HashMap<String, DbValue>,
) -> String {
let gen = provider.sql_generator();
let scalar_props: Vec<_> = meta.mapped_scalar_properties().collect();
let columns: Vec<&str> = scalar_props
.iter()
.map(|p| p.column_name.as_ref())
.collect();
if columns.is_empty() {
return String::new();
}
gen.insert(meta.table_name.as_ref(), &columns, true)
}
pub fn generate_update_sql(
provider: &dyn IDatabaseProvider,
meta: &EntityTypeMeta,
property_values: &HashMap<String, DbValue>,
primary_key_values: &HashMap<String, DbValue>,
) -> String {
let gen = provider.sql_generator();
let set_columns: Vec<&str> = property_values
.keys()
.filter(|k| !primary_key_values.contains_key(*k))
.map(|k| k.as_str())
.collect();
if set_columns.is_empty() || primary_key_values.is_empty() {
return String::new();
}
let where_parts: Vec<String> = primary_key_values
.keys()
.enumerate()
.map(|(i, k)| {
format!(
"{} = {}",
gen.quote_identifier(k),
gen.parameter_placeholder(i + 1)
)
})
.collect();
gen.update(
meta.table_name.as_ref(),
&set_columns,
&where_parts.join(" AND "),
)
}
pub fn generate_delete_sql(
provider: &dyn IDatabaseProvider,
meta: &EntityTypeMeta,
primary_key_values: &HashMap<String, DbValue>,
) -> String {
let gen = provider.sql_generator();
if primary_key_values.is_empty() {
return String::new();
}
let where_parts: Vec<String> = primary_key_values
.keys()
.enumerate()
.map(|(i, k)| {
format!(
"{} = {}",
gen.quote_identifier(k),
gen.parameter_placeholder(i + 1)
)
})
.collect();
gen.delete(meta.table_name.as_ref(), &where_parts.join(" AND "))
}
pub fn collect_insert_params(
meta: &EntityTypeMeta,
property_values: &HashMap<String, DbValue>,
) -> Vec<DbValue> {
meta.mapped_scalar_properties()
.map(|p| {
property_values
.get(p.field_name.as_ref())
.cloned()
.unwrap_or(DbValue::Null)
})
.collect()
}
pub fn collect_update_params(
property_values: &HashMap<String, DbValue>,
primary_key_values: &HashMap<String, DbValue>,
set_keys: &[String],
) -> Vec<DbValue> {
let mut params: Vec<DbValue> = set_keys
.iter()
.filter(|k| !primary_key_values.contains_key(*k))
.map(|k| property_values.get(k).cloned().unwrap_or(DbValue::Null))
.collect();
for v in primary_key_values.values() {
params.push(v.clone());
}
params
}
pub fn collect_delete_params(primary_key_values: &HashMap<String, DbValue>) -> Vec<DbValue> {
primary_key_values.values().cloned().collect()
}