use crate::storage::Error;
use crate::storage::config_store::adapters::postgres::schema::{Field, Table};
use change_case::snake_case;
use pbbson::Model;
use pbbson::bson::Bson;
use pbbson::bson::spec::BinarySubtype;
use serde_json::Value;
use sqlx::{Execute, Pool, Postgres, QueryBuilder};
use std::time::Duration;
pub(super) async fn perform(pool: &Pool<Postgres>, table: &Table, model: Model) -> Result<(), Error> {
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("INSERT INTO ");
qb.push(&table.name);
qb.push("(");
let mut field_num = 0;
for (k, _v) in model.iter() {
let field = match find_field(table, k) {
Some(field) => field,
None => {
log::error!(field = k; "No db column found for field");
continue;
}
};
field_num += 1;
if field_num > 1 {
qb.push(", ");
}
qb.push(field.column_name);
}
qb.push(")");
qb.push(" VALUES (");
let mut field_num = 0;
for (k, v) in model.iter() {
let field = match find_field(table, k) {
Some(field) => field,
None => continue,
};
field_num += 1;
if field_num > 1 {
qb.push(", ");
}
bind(&field, v, &mut qb)?;
}
qb.push(")");
let query = qb.build();
let sql = query.sql();
log::debug!(sql; "Inserting");
query.execute(pool).await.map_err(|e| Error::internal(e.to_string()))?;
Ok(())
}
pub(crate) fn find_field(table: &Table, field_name: &str) -> Option<Field> {
let field_name_snake_case = snake_case(field_name);
if let Some(field) = table.fields_by_name.get(&field_name_snake_case) {
Some(field.clone())
} else {
table.fields_by_name.iter().find_map(|(k, v)| {
if k.replace("_", "") == field_name_snake_case.replace("_", "") {
Some(v.clone())
} else {
None
}
})
}
}
pub(crate) fn bind<'a>(field: &Field, v: &'a Bson, qb: &mut QueryBuilder<'a, Postgres>) -> Result<(), Error> {
match field.data_type.as_str() {
"ARRAY" => {
if let Some(v_array) = v.as_array() {
let mut value: Vec<String> = vec![];
for v in v_array {
value.push(v.to_string());
}
qb.push_bind(value);
}
}
"bigint" => match v {
Bson::Int32(n) => {
qb.push_bind(n);
}
Bson::Int64(n) => {
qb.push_bind(n);
}
Bson::String(s) => {
if let Ok(n) = s.parse::<i64>() {
qb.push_bind(n);
} else {
let value = format!("'{v}'::{}", field.data_type.as_str());
qb.push_bind(value);
}
}
_ => {
qb.push_bind(v.to_string());
}
},
"boolean" => match v {
Bson::Boolean(b) => {
qb.push_bind(b);
}
_ => {
qb.push_bind(v.to_string());
}
},
"character" | "character varying" | "text" => match v {
Bson::Binary(binary) => match binary.subtype {
BinarySubtype::Uuid => match binary.to_uuid() {
Ok(uuid) => {
qb.push_bind(uuid.to_string());
}
Err(_e) => {
qb.push_bind(binary.to_string());
}
},
_ => {
qb.push_bind(binary.to_string());
}
},
Bson::ObjectId(object_id) => {
if let Ok(id) = xid::Id::from_bytes(&object_id.bytes()) {
qb.push_bind(id.to_string());
} else {
qb.push_bind(v.to_string());
}
}
Bson::String(v) => {
qb.push_bind(v);
}
_ => {
qb.push_bind(v.to_string());
}
},
"integer" => match v {
Bson::Int32(n) => {
qb.push_bind(n);
}
Bson::Int64(n) => {
qb.push_bind(n);
}
Bson::String(s) => {
if let Ok(n) = s.parse::<i64>() {
qb.push_bind(n);
} else {
let value = format!("'{s}'::{}", field.data_type.as_str());
qb.push(value);
}
}
_ => {
qb.push_bind(v.to_string());
}
},
"interval" => match v {
Bson::String(s) => {
match s.strip_suffix('s') {
Some(s) => match s.parse::<f64>() {
Ok(v) => qb.push_bind(Duration::from_micros((v * 1e9) as u64)),
Err(_e) => qb.push_bind(s),
},
None => match s.parse::<f64>() {
Ok(v) => qb.push_bind(Duration::from_micros((v * 1e9) as u64)),
Err(_e) => qb.push_bind(s),
},
};
}
_ => {
qb.push_bind(v.to_string());
}
},
"json" => match v {
Bson::Document(doc) => {
if let Ok(doc_as_json) = serde_json::to_string(doc)
&& let Ok(v) = serde_json::from_str::<Value>(&doc_as_json)
{
qb.push_bind(v);
}
}
_ => {
qb.push_bind(v.to_string());
}
},
"numeric" => match v {
Bson::Double(n) => {
qb.push_bind(n);
}
_ => {
let value = format!("'{v}'::{}", field.data_type.as_str());
qb.push(value);
}
},
"real" | "double precision" => match v {
Bson::Double(n) => {
qb.push_bind(n);
}
_ => {
let value = format!("'{v}'::{}", field.data_type.as_str());
qb.push(value);
}
},
"smallint" => match v {
Bson::Int32(n) => {
qb.push_bind(n);
}
Bson::Int64(n) => {
qb.push_bind(n);
}
_ => {
let value = format!("'{v}'::{}", field.data_type.as_str());
qb.push(value);
}
},
"timestamp with time zone" => match v {
Bson::DateTime(dt) => {
if let Ok(s) = dt.try_to_rfc3339_string() {
let value = format!("'{s}'::timestamp");
qb.push(value);
} else {
qb.push_bind(v.to_string());
}
}
_ => {
qb.push_bind(v.to_string());
}
},
"uuid" => match v {
Bson::Binary(binary) => match binary.subtype {
BinarySubtype::Uuid => {
if let Ok(uuid_) = binary.to_uuid() {
let bytes = uuid_.bytes();
if let Ok(uuid) = uuid::Uuid::from_slice(&bytes) {
qb.push_bind(uuid);
}
}
}
_ => {
qb.push_bind(binary.to_string());
}
},
_ => {
qb.push_bind(v.to_string());
}
},
_ => {
qb.push_bind(v.to_string());
}
}
Ok(())
}