use deadpool_postgres::{Config, Pool, Runtime};
use serde_json::Value;
use tokio_postgres::{types::ToSql, NoTls, Row};
use uuid::Uuid;
use crate::models::{Account, AccountRole, AppState, DatabaseAccess, FieldMetadata, Permission, QueryResult};
pub async fn get_or_create_pool(
state: &AppState,
pool_key: &str,
account: &Account,
db_access: &DatabaseAccess,
target_database: &str,
) -> Result<Pool, String> {
if let Some(pool) = state.connections.get(pool_key) {
return Ok(pool.clone());
}
let instances = state.instances.read().await;
let instance = instances.get(&account.instance_id)
.ok_or("Instance not found")?;
let mut config = Config::new();
config.host = Some(instance.host.clone());
config.port = Some(instance.port);
config.user = Some(db_access.username.clone());
config.password = Some(db_access.password.clone());
config.dbname = Some(target_database.to_string());
let pool = config.create_pool(Some(Runtime::Tokio1), NoTls)
.map_err(|e| e.to_string())?;
state.connections.insert(pool_key.to_string(), pool.clone());
Ok(pool)
}
pub fn validate_query_permissions(
query: &str,
permissions: &[Permission],
role: &AccountRole,
) -> Result<(), String> {
let query_upper = query.to_uppercase();
if *role == AccountRole::Owner {
return Ok(());
}
let required_permission = if query_upper.trim().starts_with("SELECT") || query_upper.trim().starts_with("WITH") {
Permission::Select
} else if query_upper.trim().starts_with("INSERT") {
Permission::Insert
} else if query_upper.trim().starts_with("UPDATE") {
Permission::Update
} else if query_upper.trim().starts_with("DELETE") {
Permission::Delete
} else if query_upper.trim().starts_with("CREATE") {
Permission::Create
} else if query_upper.trim().starts_with("DROP") {
Permission::Drop
} else if query_upper.trim().starts_with("TRUNCATE") {
Permission::Truncate
} else {
return Err("Unsupported query type".to_string());
};
if permissions.contains(&Permission::All) || permissions.contains(&required_permission) {
Ok(())
} else {
Err(format!("Missing permission: {required_permission:?}"))
}
}
pub fn rows_to_json(rows: &[Row]) -> Vec<Value> {
rows.iter().map(|row| {
let mut obj = serde_json::Map::new();
for (i, column) in row.columns().iter().enumerate() {
let value = match column.type_().name() {
"bool" => row.try_get::<_, bool>(i).ok().map(Value::Bool),
"int2" | "int4" => row.try_get::<_, i32>(i).ok().map(|v| Value::Number(v.into())),
"int8" => row.try_get::<_, i64>(i).ok().map(|v| Value::Number(v.into())),
"float4" => row.try_get::<_, f32>(i).ok().map(|v| Value::Number(serde_json::Number::from_f64(v as f64).unwrap())),
"float8" => row.try_get::<_, f64>(i).ok().map(|v| Value::Number(serde_json::Number::from_f64(v).unwrap())),
"text" | "varchar" | "char" | "bpchar" => {
row.try_get::<_, String>(i).ok().map(Value::String)
}
"json" | "jsonb" => row.try_get::<_, Value>(i).ok(),
"timestamp" | "timestamptz" => {
row.try_get::<_, chrono::NaiveDateTime>(i)
.ok()
.map(|dt| Value::String(dt.format("%Y-%m-%d %H:%M:%S").to_string()))
}
"date" => {
row.try_get::<_, chrono::NaiveDate>(i)
.ok()
.map(|d| Value::String(d.format("%Y-%m-%d").to_string()))
}
"uuid" => row.try_get::<_, Uuid>(i).ok().map(|u| Value::String(u.to_string())),
"numeric" | "decimal" => {
row.try_get::<_, String>(i).ok().map(Value::String)
}
_ => {
row.try_get::<_, String>(i).ok().map(Value::String)
}
};
obj.insert(column.name().to_string(), value.unwrap_or(Value::Null));
}
Value::Object(obj)
}).collect()
}
pub async fn execute_query_with_pool(
pool: Pool,
query: String,
_params: Vec<Value>,
) -> Result<QueryResult, String> {
let client = pool.get().await.map_err(|e| e.to_string())?;
let params: Vec<&(dyn ToSql + Sync)> = vec![];
let rows = client.query(&query, ¶ms).await
.map_err(|e| e.to_string())?;
let fields = if !rows.is_empty() {
rows[0].columns().iter().map(|col| {
FieldMetadata {
name: col.name().to_string(),
data_type: col.type_().name().to_string(),
nullable: true, max_length: None,
}
}).collect()
} else {
vec![]
};
let json_rows = rows_to_json(&rows);
Ok(QueryResult {
rows: json_rows,
fields,
query_plan: None,
})
}