use crate::adapter::postgres::PostgresAdapter;
use crate::error::{QuickDbError, QuickDbResult};
use crate::types::DataValue;
use rat_logger::debug;
use serde_json::Value;
use sqlx::{Column, Row, TypeInfo};
use std::collections::HashMap;
pub(crate) fn row_to_data_map(
adapter: &PostgresAdapter,
row: &sqlx::postgres::PgRow,
) -> QuickDbResult<HashMap<String, DataValue>> {
let mut map = HashMap::new();
for column in row.columns() {
let column_name = column.name();
let type_name = column.type_info().name();
let data_value = match type_name {
"INT4" | "INT8" => {
if let Ok(val) = row.try_get::<Option<i32>, _>(column_name) {
match val {
Some(i) => DataValue::Int(i as i64),
None => DataValue::Null,
}
} else if let Ok(val) = row.try_get::<Option<i64>, _>(column_name) {
match val {
Some(i) => {
if column_name == "id" && i > 1000000000000000000 {
DataValue::String(i.to_string())
} else {
DataValue::Int(i)
}
}
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
"FLOAT4" | "FLOAT8" => {
if let Ok(val) = row.try_get::<Option<f32>, _>(column_name) {
match val {
Some(f) => DataValue::Float(f as f64),
None => DataValue::Null,
}
} else if let Ok(val) = row.try_get::<Option<f64>, _>(column_name) {
match val {
Some(f) => DataValue::Float(f),
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
"BOOL" => {
if let Ok(val) = row.try_get::<Option<bool>, _>(column_name) {
match val {
Some(b) => DataValue::Bool(b),
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
"TEXT" | "VARCHAR" | "CHAR" => {
if let Ok(val) = row.try_get::<Option<String>, _>(column_name) {
match val {
Some(s) => DataValue::String(s),
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
"UUID" => {
if let Ok(val) = row.try_get::<Option<uuid::Uuid>, _>(column_name) {
match val {
Some(u) => {
DataValue::String(u.to_string())
}
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
"JSON" | "JSONB" => {
if let Ok(val) = row.try_get::<Option<serde_json::Value>, _>(column_name) {
match val {
Some(json_val) => {
crate::types::data_value::json_value_to_data_value(json_val)
}
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
type_name if type_name.ends_with("[]") => {
if let Ok(val) = row.try_get::<Option<Vec<String>>, _>(column_name) {
match val {
Some(arr) => {
debug!(
"PostgreSQL数组字段 {} 转换为DataValue::Array,元素数量: {}",
column_name,
arr.len()
);
let data_array: Vec<DataValue> =
arr.into_iter().map(DataValue::String).collect();
DataValue::Array(data_array)
}
None => DataValue::Null,
}
} else {
debug!(
"PostgreSQL数组字段 {} 无法作为字符串数组读取,尝试作为JSON",
column_name
);
if let Ok(val) = row.try_get::<Option<serde_json::Value>, _>(column_name) {
match val {
Some(json_val) => {
debug!(
"PostgreSQL数组字段 {} 作为JSON处理: {:?}",
column_name, json_val
);
crate::types::data_value::json_value_to_data_value(json_val)
}
None => DataValue::Null,
}
} else {
debug!("PostgreSQL数组字段 {} 读取失败,设置为Null", column_name);
DataValue::Null
}
}
}
"timestamp without time zone" | "TIMESTAMP" | "TIMESTAMPTZ" => {
if let Ok(val) =
row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(column_name)
{
match val {
Some(dt) => {
DataValue::DateTime(dt.with_timezone(&chrono::FixedOffset::east(0)))
}
None => DataValue::Null,
}
} else if let Ok(val) = row.try_get::<Option<chrono::NaiveDateTime>, _>(column_name)
{
match val {
Some(ndt) => {
let utc_dt = ndt.and_utc();
DataValue::DateTime(utc_dt.with_timezone(&chrono::FixedOffset::east(0)))
}
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
_ => {
if let Ok(val) = row.try_get::<Option<String>, _>(column_name) {
match val {
Some(s) => DataValue::String(s),
None => DataValue::Null,
}
} else {
DataValue::Null
}
}
};
map.insert(column_name.to_string(), data_value);
}
Ok(map)
}
pub(crate) fn row_to_json(
adapter: &PostgresAdapter,
row: &sqlx::postgres::PgRow,
) -> QuickDbResult<Value> {
let data_map = row_to_data_map(adapter, row)?;
let mut json_map = serde_json::Map::new();
for (key, value) in data_map {
json_map.insert(key, value.to_json_value());
}
Ok(Value::Object(json_map))
}
pub(crate) async fn execute_query(
adapter: &PostgresAdapter,
pool: &sqlx::Pool<sqlx::Postgres>,
sql: &str,
params: &[DataValue],
table: &str,
) -> QuickDbResult<Vec<DataValue>> {
let mut query = sqlx::query(sql);
for param in params {
query = match param {
DataValue::String(s) => {
match s.parse::<uuid::Uuid>() {
Ok(uuid) => query.bind(uuid), Err(_) => query.bind(s), }
}
DataValue::Int(i) => query.bind(*i),
DataValue::UInt(u) => {
if *u <= i64::MAX as u64 {
query.bind(*u as i64)
} else {
query.bind(u.to_string())
}
}
DataValue::Float(f) => query.bind(*f),
DataValue::Bool(b) => query.bind(*b),
DataValue::DateTime(dt) => query.bind(*dt),
DataValue::DateTimeUTC(dt) => {
query.bind(dt.with_timezone(&chrono::FixedOffset::east(0)))
}
DataValue::Uuid(uuid) => query.bind(*uuid),
DataValue::Json(json) => query.bind(json),
DataValue::Bytes(bytes) => query.bind(bytes.as_slice()),
DataValue::Null => query.bind(Option::<String>::None),
DataValue::Array(arr) => {
let json_array = DataValue::Array(arr.clone()).to_json_value();
query.bind(json_array)
}
DataValue::Object(obj) => {
let json_object = DataValue::Object(obj.clone()).to_json_value();
query.bind(json_object)
}
};
}
let rows = query
.fetch_all(pool)
.await
.map_err(|e| {
let error_string = e.to_string().to_lowercase();
if error_string.contains("relation") && error_string.contains("does not exist") ||
error_string.contains(&format!("relation \"{}\" does not exist", table.to_lowercase())) ||
error_string.contains("table") && error_string.contains("doesn't exist") ||
error_string.contains(&format!("table \"{}\" doesn't exist", table.to_lowercase())) ||
error_string.contains("relation") && error_string.contains("unknown") {
QuickDbError::TableNotExistError {
table: table.to_string(),
message: format!("PostgreSQL表 '{}' 不存在", table),
}
} else {
QuickDbError::QueryError {
message: format!("执行PostgreSQL查询失败: {}", e),
}
}
})?;
let mut results = Vec::new();
for row in rows {
let data_map = row_to_data_map(adapter, &row)?;
results.push(DataValue::Object(data_map));
}
Ok(results)
}
pub(crate) async fn execute_update(
adapter: &PostgresAdapter,
pool: &sqlx::Pool<sqlx::Postgres>,
sql: &str,
params: &[DataValue],
table: &str,
) -> QuickDbResult<u64> {
rat_logger::debug!("🔍 PostgreSQL execute_update: SQL={}", sql);
let mut query = sqlx::query(sql);
for (i, param) in params.iter().enumerate() {
rat_logger::debug!("🔍 PostgreSQL execute_update: 参数[{}] = {:?}", i, param);
query = match param {
DataValue::String(s) => {
match s.parse::<uuid::Uuid>() {
Ok(uuid) => {
rat_logger::debug!("🔍 PostgreSQL: 字符串 '{}' 成功解析为UUID", s);
query.bind(uuid) }
Err(_) => {
rat_logger::debug!(
"🔍 PostgreSQL: 字符串 '{}' 不是有效UUID,作为字符串处理",
s
);
query.bind(s) }
}
}
DataValue::Int(i) => query.bind(*i),
DataValue::UInt(u) => {
if *u <= i64::MAX as u64 {
query.bind(*u as i64)
} else {
query.bind(u.to_string())
}
}
DataValue::Float(f) => query.bind(*f),
DataValue::Bool(b) => query.bind(*b),
DataValue::DateTime(dt) => query.bind(*dt),
DataValue::DateTimeUTC(dt) => {
query.bind(dt.with_timezone(&chrono::FixedOffset::east(0)))
}
DataValue::Uuid(uuid) => query.bind(*uuid),
DataValue::Json(json) => query.bind(json),
DataValue::Bytes(bytes) => query.bind(bytes.as_slice()),
DataValue::Null => query.bind(Option::<String>::None),
DataValue::Array(arr) => query.bind(serde_json::to_value(arr).unwrap_or_default()),
DataValue::Object(obj) => query.bind(serde_json::to_value(obj).unwrap_or_default()),
};
}
let result = query
.execute(pool)
.await
.map_err(|e| {
let error_string = e.to_string().to_lowercase();
if error_string.contains("relation") && error_string.contains("does not exist") ||
error_string.contains(&format!("relation \"{}\" does not exist", table.to_lowercase())) ||
error_string.contains("table") && error_string.contains("doesn't exist") ||
error_string.contains(&format!("table \"{}\" doesn't exist", table.to_lowercase())) ||
error_string.contains("relation") && error_string.contains("unknown") {
QuickDbError::TableNotExistError {
table: table.to_string(),
message: format!("PostgreSQL表 '{}' 不存在", table),
}
} else {
QuickDbError::QueryError {
message: format!("执行PostgreSQL更新失败: {}", e),
}
}
})?;
Ok(result.rows_affected())
}