use crate::storage::Error;
use crate::storage::config_store::adapters::postgres::schema::Table;
use change_case::camel_case;
use num_traits::cast::ToPrimitive;
use pbbson::Model;
use pbbson::bson::spec::BinarySubtype;
use pbbson::bson::{self, Bson, DateTime, binary};
use sqlx::postgres::{PgRow, types::PgInterval};
use sqlx::types::Decimal;
use sqlx::{
Column, Row, ValueRef,
types::chrono::{self, Utc},
};
use std::time::{Duration, SystemTime};
pub(crate) fn deserialize(row: PgRow, table: &Table) -> Result<Model, Error> {
let mut model = Model::default();
for (index, column) in row.columns().iter().enumerate() {
let column_name = column.name();
let value = row.try_get_raw(index).map_err(|e| Error::internal(e.to_string()))?;
if value.is_null() {
continue;
}
let field = match table.fields_by_name.get(column_name) {
None => continue,
Some(field) => field,
};
match field.data_type.as_str() {
"ARRAY" => {
let vec: Vec<String> = row.get(index);
model.insert(
camel_case(column_name),
Bson::Array(vec.into_iter().map(Bson::String).collect()),
);
}
"bigint" => {
let value: i64 = row.get(index);
model.insert(camel_case(column_name), Bson::String(format!("{value}")));
}
"character varying" => {
if let Ok(value_str) = value.as_str() {
model.insert(camel_case(column_name), Bson::String(value_str.to_string()));
}
}
"double precision" => {
let value: f64 = row.get(index);
model.insert(camel_case(column_name), Bson::Double(value));
}
"integer" => {
let value: i32 = row.get(index);
model.insert(camel_case(column_name), Bson::Int32(value));
}
"interval" => {
let interval: PgInterval = row.get(index);
let dur: Duration = Duration::from_nanos((interval.microseconds) as u64);
model.insert(camel_case(column_name), Bson::String(format!("{dur:?}")));
}
"json" => {
if let Ok(value_str) = value.as_str()
&& let Ok(value_as_json) = serde_json::from_str::<serde_json::Value>(value_str)
&& let Ok(doc) = bson::serialize_to_document(&value_as_json)
{
model.insert(camel_case(column_name), Bson::Document(doc));
}
}
"numeric" => {
let value: Decimal = row.get(index);
if let Some(value) = value.to_f64() {
model.insert(camel_case(column_name), Bson::Double(value));
}
}
"real" => {
let value: f32 = row.get(index);
model.insert(camel_case(column_name), Bson::Double(value as f64));
}
"smallint" => {
let value: i16 = row.get(index);
model.insert(camel_case(column_name), Bson::Int32(value as i32));
}
"timestamp with time zone" => {
let chrono_dt: chrono::DateTime<Utc> = row.get(index);
let system_time: SystemTime = chrono_dt.into();
let dt = DateTime::from_system_time(system_time);
model.insert(camel_case(column_name), Bson::DateTime(dt));
}
"uuid" => {
let value: uuid::Uuid = row.get(index);
let bytes = value.as_bytes().to_vec();
model.insert(
camel_case(column_name),
Bson::Binary(binary::Binary {
subtype: BinarySubtype::Uuid,
bytes,
}),
);
}
_ => {
log::debug!("Unsupported datatype: {}", field.data_type.as_str());
}
}
}
Ok(model)
}