xdl_database/drivers/
postgres.rs1use crate::{recordset::ColumnInfo, DatabaseError, DatabaseResult, Recordset};
4use serde_json::Value as JsonValue;
5use tokio_postgres::{Client, Config, NoTls};
6
7#[derive(Debug)]
9pub struct PostgresConnection {
10 client: Option<Client>,
11}
12
13impl PostgresConnection {
14 pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
16 let config: Config = connection_string.parse().map_err(|e| {
18 DatabaseError::connection_error(format!("Invalid connection string: {}", e))
19 })?;
20
21 let (client, connection) = config.connect(NoTls).await?;
23
24 tokio::spawn(async move {
26 if let Err(e) = connection.await {
27 eprintln!("PostgreSQL connection error: {}", e);
28 }
29 });
30
31 Ok(Self {
32 client: Some(client),
33 })
34 }
35
36 pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
38 let client = self.client.as_ref().ok_or(DatabaseError::NotConnected)?;
39
40 let rows = client.query(query, &[]).await?;
42
43 if rows.is_empty() {
44 return Ok(Recordset::empty());
45 }
46
47 let columns: Vec<ColumnInfo> = rows[0]
49 .columns()
50 .iter()
51 .enumerate()
52 .map(|(i, col)| ColumnInfo {
53 name: col.name().to_string(),
54 data_type: format!("{:?}", col.type_()),
55 ordinal: i,
56 })
57 .collect();
58
59 let mut data_rows = Vec::new();
61 for row in rows {
62 let mut row_data = Vec::new();
63
64 for i in 0..row.len() {
65 let value = postgres_value_to_json(&row, i)?;
66 row_data.push(value);
67 }
68
69 data_rows.push(row_data);
70 }
71
72 Ok(Recordset::new(columns, data_rows))
73 }
74
75 pub async fn execute_command(&self, command: &str) -> DatabaseResult<u64> {
77 let client = self.client.as_ref().ok_or(DatabaseError::NotConnected)?;
78
79 let rows_affected = client.execute(command, &[]).await?;
80 Ok(rows_affected)
81 }
82
83 pub async fn close(&mut self) -> DatabaseResult<()> {
85 self.client = None;
86 Ok(())
87 }
88
89 pub async fn is_connected(&self) -> bool {
91 if let Some(client) = &self.client {
92 client.query("SELECT 1", &[]).await.is_ok()
94 } else {
95 false
96 }
97 }
98}
99
100fn postgres_value_to_json(row: &tokio_postgres::Row, idx: usize) -> DatabaseResult<JsonValue> {
102 use tokio_postgres::types::Type;
103
104 let col_type = row.columns()[idx].type_();
105
106 let value = match *col_type {
107 Type::BOOL => {
108 let v: Option<bool> = row
109 .try_get(idx)
110 .map_err(|e| DatabaseError::conversion_error(format!("Bool conversion: {}", e)))?;
111 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
112 }
113 Type::INT2 => {
114 let v: Option<i16> = row
115 .try_get(idx)
116 .map_err(|e| DatabaseError::conversion_error(format!("Int2 conversion: {}", e)))?;
117 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
118 }
119 Type::INT4 => {
120 let v: Option<i32> = row
121 .try_get(idx)
122 .map_err(|e| DatabaseError::conversion_error(format!("Int4 conversion: {}", e)))?;
123 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
124 }
125 Type::INT8 => {
126 let v: Option<i64> = row
127 .try_get(idx)
128 .map_err(|e| DatabaseError::conversion_error(format!("Int8 conversion: {}", e)))?;
129 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
130 }
131 Type::FLOAT4 => {
132 let v: Option<f32> = row.try_get(idx).map_err(|e| {
133 DatabaseError::conversion_error(format!("Float4 conversion: {}", e))
134 })?;
135 v.map(|f| JsonValue::from(f as f64))
136 .unwrap_or(JsonValue::Null)
137 }
138 Type::FLOAT8 => {
139 let v: Option<f64> = row.try_get(idx).map_err(|e| {
140 DatabaseError::conversion_error(format!("Float8 conversion: {}", e))
141 })?;
142 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
143 }
144 Type::TEXT | Type::VARCHAR => {
145 let v: Option<String> = row
146 .try_get(idx)
147 .map_err(|e| DatabaseError::conversion_error(format!("Text conversion: {}", e)))?;
148 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
149 }
150 _ => {
151 let v: Option<String> = row.try_get(idx).ok();
153 v.map(JsonValue::from).unwrap_or(JsonValue::Null)
154 }
155 };
156
157 Ok(value)
158}