use crate::{recordset::ColumnInfo, DatabaseError, DatabaseResult, Recordset};
use serde_json::Value as JsonValue;
use tokio_postgres::{Client, Config, NoTls};
#[derive(Debug)]
pub struct PostgresConnection {
client: Option<Client>,
}
impl PostgresConnection {
pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
let config: Config = connection_string.parse().map_err(|e| {
DatabaseError::connection_error(format!("Invalid connection string: {}", e))
})?;
let (client, connection) = config.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("PostgreSQL connection error: {}", e);
}
});
Ok(Self {
client: Some(client),
})
}
pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
let client = self.client.as_ref().ok_or(DatabaseError::NotConnected)?;
let rows = client.query(query, &[]).await?;
if rows.is_empty() {
return Ok(Recordset::empty());
}
let columns: Vec<ColumnInfo> = rows[0]
.columns()
.iter()
.enumerate()
.map(|(i, col)| ColumnInfo {
name: col.name().to_string(),
data_type: format!("{:?}", col.type_()),
ordinal: i,
})
.collect();
let mut data_rows = Vec::new();
for row in rows {
let mut row_data = Vec::new();
for i in 0..row.len() {
let value = postgres_value_to_json(&row, i)?;
row_data.push(value);
}
data_rows.push(row_data);
}
Ok(Recordset::new(columns, data_rows))
}
pub async fn execute_command(&self, command: &str) -> DatabaseResult<u64> {
let client = self.client.as_ref().ok_or(DatabaseError::NotConnected)?;
let rows_affected = client.execute(command, &[]).await?;
Ok(rows_affected)
}
pub async fn close(&mut self) -> DatabaseResult<()> {
self.client = None;
Ok(())
}
pub async fn is_connected(&self) -> bool {
if let Some(client) = &self.client {
client.query("SELECT 1", &[]).await.is_ok()
} else {
false
}
}
}
fn postgres_value_to_json(row: &tokio_postgres::Row, idx: usize) -> DatabaseResult<JsonValue> {
use tokio_postgres::types::Type;
let col_type = row.columns()[idx].type_();
let value = match *col_type {
Type::BOOL => {
let v: Option<bool> = row
.try_get(idx)
.map_err(|e| DatabaseError::conversion_error(format!("Bool conversion: {}", e)))?;
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
Type::INT2 => {
let v: Option<i16> = row
.try_get(idx)
.map_err(|e| DatabaseError::conversion_error(format!("Int2 conversion: {}", e)))?;
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
Type::INT4 => {
let v: Option<i32> = row
.try_get(idx)
.map_err(|e| DatabaseError::conversion_error(format!("Int4 conversion: {}", e)))?;
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
Type::INT8 => {
let v: Option<i64> = row
.try_get(idx)
.map_err(|e| DatabaseError::conversion_error(format!("Int8 conversion: {}", e)))?;
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
Type::FLOAT4 => {
let v: Option<f32> = row.try_get(idx).map_err(|e| {
DatabaseError::conversion_error(format!("Float4 conversion: {}", e))
})?;
v.map(|f| JsonValue::from(f as f64))
.unwrap_or(JsonValue::Null)
}
Type::FLOAT8 => {
let v: Option<f64> = row.try_get(idx).map_err(|e| {
DatabaseError::conversion_error(format!("Float8 conversion: {}", e))
})?;
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
Type::TEXT | Type::VARCHAR => {
let v: Option<String> = row
.try_get(idx)
.map_err(|e| DatabaseError::conversion_error(format!("Text conversion: {}", e)))?;
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
_ => {
let v: Option<String> = row.try_get(idx).ok();
v.map(JsonValue::from).unwrap_or(JsonValue::Null)
}
};
Ok(value)
}