pg-api 0.1.0

A high-performance PostgreSQL REST API driver with rate limiting, connection pooling, and observability
use deadpool_postgres::{Config, Pool, Runtime};
use serde_json::Value;
use tokio_postgres::{types::ToSql, NoTls, Row};
use uuid::Uuid;

use crate::models::{Account, AccountRole, AppState, DatabaseAccess, FieldMetadata, Permission, QueryResult};

pub async fn get_or_create_pool(
    state: &AppState,
    pool_key: &str,
    account: &Account,
    db_access: &DatabaseAccess,
    target_database: &str,
) -> Result<Pool, String> {
    // Check if pool exists
    if let Some(pool) = state.connections.get(pool_key) {
        return Ok(pool.clone());
    }
    
    // Get instance info
    let instances = state.instances.read().await;
    let instance = instances.get(&account.instance_id)
        .ok_or("Instance not found")?;
    
    // Create new pool
    let mut config = Config::new();
    config.host = Some(instance.host.clone());
    config.port = Some(instance.port);
    config.user = Some(db_access.username.clone());
    config.password = Some(db_access.password.clone());
    // Use target_database instead of db_access.database to support wildcard access
    config.dbname = Some(target_database.to_string());
    // Pool size is set in creation
    
    let pool = config.create_pool(Some(Runtime::Tokio1), NoTls)
        .map_err(|e| e.to_string())?;
    
    state.connections.insert(pool_key.to_string(), pool.clone());
    
    Ok(pool)
}

pub fn validate_query_permissions(
    query: &str,
    permissions: &[Permission],
    role: &AccountRole,
) -> Result<(), String> {
    let query_upper = query.to_uppercase();
    
    // Owner can do anything
    if *role == AccountRole::Owner {
        return Ok(());
    }
    
    // Check query type and required permission
    let required_permission = if query_upper.trim().starts_with("SELECT") || query_upper.trim().starts_with("WITH") {
        Permission::Select
    } else if query_upper.trim().starts_with("INSERT") {
        Permission::Insert
    } else if query_upper.trim().starts_with("UPDATE") {
        Permission::Update
    } else if query_upper.trim().starts_with("DELETE") {
        Permission::Delete
    } else if query_upper.trim().starts_with("CREATE") {
        Permission::Create
    } else if query_upper.trim().starts_with("DROP") {
        Permission::Drop
    } else if query_upper.trim().starts_with("TRUNCATE") {
        Permission::Truncate
    } else {
        return Err("Unsupported query type".to_string());
    };
    
    // Check if user has permission
    if permissions.contains(&Permission::All) || permissions.contains(&required_permission) {
        Ok(())
    } else {
        Err(format!("Missing permission: {required_permission:?}"))
    }
}

pub fn rows_to_json(rows: &[Row]) -> Vec<Value> {
    rows.iter().map(|row| {
        let mut obj = serde_json::Map::new();
        
        for (i, column) in row.columns().iter().enumerate() {
            let value = match column.type_().name() {
                "bool" => row.try_get::<_, bool>(i).ok().map(Value::Bool),
                "int2" | "int4" => row.try_get::<_, i32>(i).ok().map(|v| Value::Number(v.into())),
                "int8" => row.try_get::<_, i64>(i).ok().map(|v| Value::Number(v.into())),
                "float4" => row.try_get::<_, f32>(i).ok().map(|v| Value::Number(serde_json::Number::from_f64(v as f64).unwrap())),
                "float8" => row.try_get::<_, f64>(i).ok().map(|v| Value::Number(serde_json::Number::from_f64(v).unwrap())),
                "text" | "varchar" | "char" | "bpchar" => {
                    row.try_get::<_, String>(i).ok().map(Value::String)
                }
                "json" | "jsonb" => row.try_get::<_, Value>(i).ok(),
                "timestamp" | "timestamptz" => {
                    row.try_get::<_, chrono::NaiveDateTime>(i)
                        .ok()
                        .map(|dt| Value::String(dt.format("%Y-%m-%d %H:%M:%S").to_string()))
                }
                "date" => {
                    row.try_get::<_, chrono::NaiveDate>(i)
                        .ok()
                        .map(|d| Value::String(d.format("%Y-%m-%d").to_string()))
                }
                "uuid" => row.try_get::<_, Uuid>(i).ok().map(|u| Value::String(u.to_string())),
                "numeric" | "decimal" => {
                    // Handle as string to preserve precision
                    row.try_get::<_, String>(i).ok().map(Value::String)
                }
                _ => {
                    // Fallback to string representation
                    row.try_get::<_, String>(i).ok().map(Value::String)
                }
            };
            
            obj.insert(column.name().to_string(), value.unwrap_or(Value::Null));
        }
        
        Value::Object(obj)
    }).collect()
}

pub async fn execute_query_with_pool(
    pool: Pool,
    query: String,
    _params: Vec<Value>,
) -> Result<QueryResult, String> {
    let client = pool.get().await.map_err(|e| e.to_string())?;
    
    // Convert JSON params to PostgreSQL types
    let params: Vec<&(dyn ToSql + Sync)> = vec![];
    // TODO: Implement proper parameter binding
    
    let rows = client.query(&query, &params).await
        .map_err(|e| e.to_string())?;
    
    // Extract field metadata
    let fields = if !rows.is_empty() {
        rows[0].columns().iter().map(|col| {
            FieldMetadata {
                name: col.name().to_string(),
                data_type: col.type_().name().to_string(),
                nullable: true, // TODO: Get actual nullable info
                max_length: None,
            }
        }).collect()
    } else {
        vec![]
    };
    
    // Convert rows to JSON
    let json_rows = rows_to_json(&rows);
    
    Ok(QueryResult {
        rows: json_rows,
        fields,
        query_plan: None,
    })
}