athena_rs 1.1.0

Database gateway API
Documentation
//! Execute queries against PostgreSQL and map rows into JSON.
//!
//! Connection string is built from the provided `athena_db_name`.
use serde_json::{Map, Value, json};
use std::borrow::Cow;
use std::error::Error;
use std::process::{Command, Output};
use std::time::Instant;

use tracing::{error, instrument};

#[instrument(skip(query), fields(query_length = query.len(), db_name = %athena_db_name))]
/// Execute a SQL query against a PostgreSQL database and return the raw result.
pub async fn execute_query(query: String, athena_db_name: String) -> Result<Value, Box<dyn Error>> {
    let start_time: Instant = Instant::now();
    let connection_string: String = format!(
        "postgres://postgres:new_secure_password@db.xylex.cloud/{}",
        athena_db_name
    );
    let output: Output = Command::new("psql")
        .arg(&connection_string)
        .arg("-t")
        .arg("-c")
        .arg(&query)
        .output()?;

    if !output.status.success() {
        let error_msg = String::from_utf8_lossy(&output.stderr);
        error!(error = %error_msg, "psql command failed");
        return Err(format!("psql execution failed: {}", error_msg).into());
    }

    let stdout: Cow<'_, str> = String::from_utf8_lossy(&output.stdout);
    let lines: Vec<&str> = stdout.lines().collect();

    if lines.is_empty() {
        let duration: u64 = start_time.elapsed().as_millis() as u64;
        return Ok(json!({
            "data": [],
            "db_name": athena_db_name,
            "duration": duration,
            "message": "Successfully executed query",
            "status": "success"
        }));
    }

    let mut results: Vec<Value> = Vec::new();
    let headers: Vec<&str> = lines[0].split(',').collect();

    for line in lines.iter().skip(1) {
        if line.trim().is_empty() {
            continue;
        }

        let values: Vec<&str> = line.split(',').collect();
        let mut row_data: Map<String, Value> = Map::new();

        for (i, header) in headers.iter().enumerate() {
            let value: String = values.get(i).unwrap_or(&"").to_string();
            row_data.insert(header.to_string(), Value::String(value));
        }

        results.push(Value::Object(row_data));
    }

    let duration: u64 = start_time.elapsed().as_millis() as u64;

    Ok(json!({
        "data": results,
        "db_name": athena_db_name,
        "duration": duration,
        "message": "Successfully executed query",
        "status": "success"
    }))
}