use serde_json::{Map, Value, json};
use std::borrow::Cow;
use std::error::Error;
use std::process::{Command, Output};
use std::time::Instant;
use tracing::{error, instrument};
#[instrument(skip(query), fields(query_length = query.len(), db_name = %athena_db_name))]
pub async fn execute_query(query: String, athena_db_name: String) -> Result<Value, Box<dyn Error>> {
let start_time: Instant = Instant::now();
let connection_string: String = format!(
"postgres://postgres:new_secure_password@db.xylex.cloud/{}",
athena_db_name
);
let output: Output = Command::new("psql")
.arg(&connection_string)
.arg("-t")
.arg("-c")
.arg(&query)
.output()?;
if !output.status.success() {
let error_msg = String::from_utf8_lossy(&output.stderr);
error!(error = %error_msg, "psql command failed");
return Err(format!("psql execution failed: {}", error_msg).into());
}
let stdout: Cow<'_, str> = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = stdout.lines().collect();
if lines.is_empty() {
let duration: u64 = start_time.elapsed().as_millis() as u64;
return Ok(json!({
"data": [],
"db_name": athena_db_name,
"duration": duration,
"message": "Successfully executed query",
"status": "success"
}));
}
let mut results: Vec<Value> = Vec::new();
let headers: Vec<&str> = lines[0].split(',').collect();
for line in lines.iter().skip(1) {
if line.trim().is_empty() {
continue;
}
let values: Vec<&str> = line.split(',').collect();
let mut row_data: Map<String, Value> = Map::new();
for (i, header) in headers.iter().enumerate() {
let value: String = values.get(i).unwrap_or(&"").to_string();
row_data.insert(header.to_string(), Value::String(value));
}
results.push(Value::Object(row_data));
}
let duration: u64 = start_time.elapsed().as_millis() as u64;
Ok(json!({
"data": results,
"db_name": athena_db_name,
"duration": duration,
"message": "Successfully executed query",
"status": "success"
}))
}