burncloud_database_impl/
postgres.rs

1#[cfg(feature = "postgres")]
2use sqlx::{PgPool, Row, Column};
3use burncloud_database_core::{
4    DatabaseConnection, QueryExecutor,
5    TransactionManager, Transaction, QueryContext, QueryOptions, QueryResult, QueryParam
6};
7use burncloud_database_core::error::{DatabaseResult, DatabaseError};
8use async_trait::async_trait;
9use std::collections::HashMap;
10
11#[cfg(feature = "postgres")]
12pub struct PostgresConnection {
13    pool: Option<PgPool>,
14    connection_string: String,
15}
16
17#[cfg(feature = "postgres")]
18impl PostgresConnection {
19    pub fn new(connection_string: String) -> Self {
20        Self {
21            pool: None,
22            connection_string,
23        }
24    }
25}
26
27#[cfg(feature = "postgres")]
28#[async_trait]
29impl DatabaseConnection for PostgresConnection {
30    async fn connect(&mut self) -> DatabaseResult<()> {
31        let pool = PgPool::connect(&self.connection_string)
32            .await
33            .map_err(|e| DatabaseError::ConnectionFailed(e.to_string()))?;
34
35        self.pool = Some(pool);
36        Ok(())
37    }
38
39    async fn disconnect(&mut self) -> DatabaseResult<()> {
40        if let Some(pool) = self.pool.take() {
41            pool.close().await;
42        }
43        Ok(())
44    }
45
46    async fn is_connected(&self) -> bool {
47        self.pool.is_some()
48    }
49
50    async fn ping(&self) -> DatabaseResult<()> {
51        if let Some(pool) = &self.pool {
52            sqlx::query("SELECT 1")
53                .execute(pool)
54                .await
55                .map_err(|e| DatabaseError::QueryFailed(e.to_string()))?;
56            Ok(())
57        } else {
58            Err(DatabaseError::ConnectionFailed("Not connected".to_string()))
59        }
60    }
61}
62
63#[cfg(feature = "postgres")]
64#[async_trait]
65impl QueryExecutor for PostgresConnection {
66    async fn execute_query(
67        &self,
68        query: &str,
69        _params: &[&dyn QueryParam],
70        _context: &QueryContext,
71    ) -> DatabaseResult<QueryResult> {
72        if let Some(pool) = &self.pool {
73            let rows = sqlx::query(query)
74                .fetch_all(pool)
75                .await
76                .map_err(|e| DatabaseError::QueryFailed(e.to_string()))?;
77
78            let mut result_rows = Vec::new();
79            for row in rows {
80                let mut row_map = HashMap::new();
81                for (i, column) in row.columns().iter().enumerate() {
82                    let value: serde_json::Value = row.try_get(i)
83                        .unwrap_or(serde_json::Value::Null);
84                    row_map.insert(column.name().to_string(), value);
85                }
86                result_rows.push(row_map);
87            }
88
89            Ok(QueryResult {
90                rows: result_rows,
91                rows_affected: 0,
92                last_insert_id: None,
93            })
94        } else {
95            Err(DatabaseError::ConnectionFailed("Not connected".to_string()))
96        }
97    }
98
99    async fn execute_query_with_options(
100        &self,
101        query: &str,
102        params: &[&dyn QueryParam],
103        _options: &QueryOptions,
104        context: &QueryContext,
105    ) -> DatabaseResult<QueryResult> {
106        self.execute_query(query, params, context).await
107    }
108}
109
110#[cfg(feature = "postgres")]
111pub struct PostgresTransaction {
112    // Simplified for example
113}
114
115#[cfg(feature = "postgres")]
116#[async_trait]
117impl Transaction for PostgresTransaction {
118    async fn commit(self) -> DatabaseResult<()> {
119        Ok(())
120    }
121
122    async fn rollback(self) -> DatabaseResult<()> {
123        Ok(())
124    }
125
126    async fn execute_query(
127        &self,
128        _query: &str,
129        _params: &[&dyn QueryParam],
130    ) -> DatabaseResult<QueryResult> {
131        Ok(QueryResult {
132            rows: Vec::new(),
133            rows_affected: 0,
134            last_insert_id: None,
135        })
136    }
137}
138
139#[cfg(feature = "postgres")]
140#[async_trait]
141impl TransactionManager for PostgresConnection {
142    type Transaction = PostgresTransaction;
143
144    async fn begin_transaction(&self, _context: &QueryContext) -> DatabaseResult<Self::Transaction> {
145        Ok(PostgresTransaction {})
146    }
147}