use sqlx::{
postgres::{PgArguments, PgRow},
query::Query,
Column, Executor, FromRow, Postgres, Row, TypeInfo,
};
use crate::{
operations::serialize::{GranularOperation, OperationNotification},
queries::serialize::{FinalType, QueryData, QueryTree, ReturnType},
utils::{
delete_statement, insert_many_statement, insert_statement, ordered_keys,
to_numbered_placeholders, update_statement,
},
};
use super::prepare_sqlx_query;
#[inline]
pub fn bind_postgres_value<'q>(
query: Query<'q, Postgres, PgArguments>,
value: FinalType,
) -> Query<'q, Postgres, PgArguments> {
match value {
FinalType::Null => query.bind(None::<String>),
FinalType::Number(number) => {
if number.is_f64() {
query.bind(number.as_f64().unwrap())
} else {
query.bind(number.as_i64().unwrap())
}
}
FinalType::String(string) => query.bind(string),
FinalType::Bool(bool) => query.bind(bool),
}
}
pub async fn fetch_postgres_query<'a, E>(query: &QueryTree, executor: E) -> QueryData<PgRow>
where
E: Executor<'a, Database = Postgres>,
{
let (sql, values) = prepare_sqlx_query(&query);
let with_placeholders = to_numbered_placeholders(&sql);
let mut sqlx_query = sqlx::query(&with_placeholders);
for value in values {
sqlx_query = bind_postgres_value(sqlx_query, value);
}
match query.return_type {
ReturnType::Single => {
let row = sqlx_query.fetch_optional(executor).await.unwrap();
return QueryData::Single(row);
}
ReturnType::Many => {
let rows = sqlx_query.fetch_all(executor).await.unwrap();
return QueryData::Many(rows);
}
}
}
pub fn postgres_row_to_json(row: &PgRow) -> serde_json::Value {
let mut json_map = serde_json::Map::new();
for column in row.columns() {
let column_name = column.name();
let column_type = column.type_info().name();
let value = match column_type {
"INTEGER" => row
.try_get::<i64, _>(column_name)
.ok()
.map(serde_json::Value::from),
"REAL" | "NUMERIC" => row
.try_get::<f64, _>(column_name)
.ok()
.map(serde_json::Value::from),
"BOOLEAN" => row
.try_get::<bool, _>(column_name)
.ok()
.map(serde_json::Value::from),
"TEXT" | "DATE" | "TIME" | "DATETIME" => row
.try_get::<String, _>(column_name)
.ok()
.map(serde_json::Value::from),
"NULL" => Some(serde_json::Value::Null),
"BLOB" => None, _ => None, };
if let Some(v) = value {
json_map.insert(column_name.to_string(), v);
} else {
json_map.insert(column_name.to_string(), serde_json::Value::Null);
}
}
serde_json::Value::Object(json_map)
}
pub fn postgres_rows_to_json(rows: &[PgRow]) -> serde_json::Value {
let mut json_array = Vec::new();
for row in rows {
json_array.push(postgres_row_to_json(row));
}
serde_json::Value::Array(json_array)
}
pub type SerializeRowsMapped = fn(&QueryData<PgRow>, table: &str) -> serde_json::Value;
pub async fn granular_operation_postgres<'a, E, T>(
operation: GranularOperation,
executor: E,
) -> Option<OperationNotification<T>>
where
E: Executor<'a, Database = Postgres>,
T: for<'r> FromRow<'r, PgRow>,
{
match operation {
GranularOperation::Create { table, mut data } => {
let keys = ordered_keys(&data);
let string_query = insert_statement(&table, &keys);
let numbered_query = to_numbered_placeholders(&string_query);
let mut sqlx_query = sqlx::query(&numbered_query);
for key in keys.iter() {
let value = data.remove(key).unwrap();
let native_value = FinalType::try_from(value).unwrap();
sqlx_query = bind_postgres_value(sqlx_query, native_value);
}
let result = sqlx_query.fetch_one(executor).await.unwrap();
let data = T::from_row(&result).unwrap();
Some(OperationNotification::Create {
table: table.to_string(),
data,
})
}
GranularOperation::CreateMany { table, mut data } => {
let keys = ordered_keys(&data[0]);
let string_query = insert_many_statement(&table, &keys, data.len());
let numbered_query = to_numbered_placeholders(&string_query);
let mut sqlx_query = sqlx::query(&numbered_query);
for entry in data.iter_mut() {
for key in keys.iter() {
let value = entry.remove(key).unwrap();
let native_value = FinalType::try_from(value).unwrap();
sqlx_query = bind_postgres_value(sqlx_query, native_value);
}
}
let results = sqlx_query.fetch_all(executor).await.unwrap();
let data: Vec<T> = results
.into_iter()
.map(|row| T::from_row(&row).unwrap())
.collect();
Some(OperationNotification::CreateMany {
table: table.to_string(),
data,
})
}
GranularOperation::Update {
table,
id,
mut data,
} => {
let keys = ordered_keys(&data);
let string_query = update_statement(&table, &keys);
let numbered_query = to_numbered_placeholders(&string_query);
let mut sqlx_query = sqlx::query(&numbered_query);
for key in keys.iter() {
let value = data.remove(key).unwrap();
let native_value = FinalType::try_from(value).unwrap();
sqlx_query = bind_postgres_value(sqlx_query, native_value);
}
sqlx_query = bind_postgres_value(sqlx_query, id.clone());
let result = sqlx_query.fetch_optional(executor).await.unwrap();
if result.is_none() {
return None;
}
let data = T::from_row(&result.unwrap()).unwrap();
Some(OperationNotification::Update {
table: table.to_string(),
id: id.clone(),
data,
})
}
GranularOperation::Delete { table, id } => {
let string_query = delete_statement(&table);
let numbered_query = to_numbered_placeholders(&string_query);
let mut sqlx_query = sqlx::query(&numbered_query);
sqlx_query = bind_postgres_value(sqlx_query, id.clone());
let result = sqlx_query.fetch_optional(executor).await.unwrap();
if result.is_none() {
return None;
}
let data = T::from_row(&result.unwrap()).unwrap();
Some(OperationNotification::Delete {
table: table.to_string(),
id: id.clone(),
data,
})
}
}
}