xdl_database/drivers/
postgres.rs

1//! PostgreSQL database driver
2
3use crate::{recordset::ColumnInfo, DatabaseError, DatabaseResult, Recordset};
4use serde_json::Value as JsonValue;
5use tokio_postgres::{Client, Config, NoTls};
6
7/// PostgreSQL connection
8#[derive(Debug)]
9pub struct PostgresConnection {
10    client: Option<Client>,
11}
12
13impl PostgresConnection {
14    /// Connect to a PostgreSQL database
15    pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
16        // Parse connection string
17        let config: Config = connection_string.parse().map_err(|e| {
18            DatabaseError::connection_error(format!("Invalid connection string: {}", e))
19        })?;
20
21        // Connect
22        let (client, connection) = config.connect(NoTls).await?;
23
24        // Spawn connection task
25        tokio::spawn(async move {
26            if let Err(e) = connection.await {
27                eprintln!("PostgreSQL connection error: {}", e);
28            }
29        });
30
31        Ok(Self {
32            client: Some(client),
33        })
34    }
35
36    /// Execute a SELECT query
37    pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
38        let client = self.client.as_ref().ok_or(DatabaseError::NotConnected)?;
39
40        // Execute query
41        let rows = client.query(query, &[]).await?;
42
43        if rows.is_empty() {
44            return Ok(Recordset::empty());
45        }
46
47        // Extract column information
48        let columns: Vec<ColumnInfo> = rows[0]
49            .columns()
50            .iter()
51            .enumerate()
52            .map(|(i, col)| ColumnInfo {
53                name: col.name().to_string(),
54                data_type: format!("{:?}", col.type_()),
55                ordinal: i,
56            })
57            .collect();
58
59        // Extract row data
60        let mut data_rows = Vec::new();
61        for row in rows {
62            let mut row_data = Vec::new();
63
64            for i in 0..row.len() {
65                let value = postgres_value_to_json(&row, i)?;
66                row_data.push(value);
67            }
68
69            data_rows.push(row_data);
70        }
71
72        Ok(Recordset::new(columns, data_rows))
73    }
74
75    /// Execute a command (INSERT, UPDATE, DELETE)
76    pub async fn execute_command(&self, command: &str) -> DatabaseResult<u64> {
77        let client = self.client.as_ref().ok_or(DatabaseError::NotConnected)?;
78
79        let rows_affected = client.execute(command, &[]).await?;
80        Ok(rows_affected)
81    }
82
83    /// Close the connection
84    pub async fn close(&mut self) -> DatabaseResult<()> {
85        self.client = None;
86        Ok(())
87    }
88
89    /// Check if connection is alive
90    pub async fn is_connected(&self) -> bool {
91        if let Some(client) = &self.client {
92            // Try a simple query
93            client.query("SELECT 1", &[]).await.is_ok()
94        } else {
95            false
96        }
97    }
98}
99
100/// Convert PostgreSQL value to JSON
101fn postgres_value_to_json(row: &tokio_postgres::Row, idx: usize) -> DatabaseResult<JsonValue> {
102    use tokio_postgres::types::Type;
103
104    let col_type = row.columns()[idx].type_();
105
106    let value = match *col_type {
107        Type::BOOL => {
108            let v: Option<bool> = row
109                .try_get(idx)
110                .map_err(|e| DatabaseError::conversion_error(format!("Bool conversion: {}", e)))?;
111            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
112        }
113        Type::INT2 => {
114            let v: Option<i16> = row
115                .try_get(idx)
116                .map_err(|e| DatabaseError::conversion_error(format!("Int2 conversion: {}", e)))?;
117            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
118        }
119        Type::INT4 => {
120            let v: Option<i32> = row
121                .try_get(idx)
122                .map_err(|e| DatabaseError::conversion_error(format!("Int4 conversion: {}", e)))?;
123            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
124        }
125        Type::INT8 => {
126            let v: Option<i64> = row
127                .try_get(idx)
128                .map_err(|e| DatabaseError::conversion_error(format!("Int8 conversion: {}", e)))?;
129            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
130        }
131        Type::FLOAT4 => {
132            let v: Option<f32> = row.try_get(idx).map_err(|e| {
133                DatabaseError::conversion_error(format!("Float4 conversion: {}", e))
134            })?;
135            v.map(|f| JsonValue::from(f as f64))
136                .unwrap_or(JsonValue::Null)
137        }
138        Type::FLOAT8 => {
139            let v: Option<f64> = row.try_get(idx).map_err(|e| {
140                DatabaseError::conversion_error(format!("Float8 conversion: {}", e))
141            })?;
142            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
143        }
144        Type::TEXT | Type::VARCHAR => {
145            let v: Option<String> = row
146                .try_get(idx)
147                .map_err(|e| DatabaseError::conversion_error(format!("Text conversion: {}", e)))?;
148            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
149        }
150        _ => {
151            // Try to get as string for other types
152            let v: Option<String> = row.try_get(idx).ok();
153            v.map(JsonValue::from).unwrap_or(JsonValue::Null)
154        }
155    };
156
157    Ok(value)
158}