burncloud_database_impl/
postgres.rs1#[cfg(feature = "postgres")]
2use sqlx::{PgPool, Row, Column};
3use burncloud_database_core::{
4 DatabaseConnection, QueryExecutor,
5 TransactionManager, Transaction, QueryContext, QueryOptions, QueryResult, QueryParam
6};
7use burncloud_database_core::error::{DatabaseResult, DatabaseError};
8use async_trait::async_trait;
9use std::collections::HashMap;
10
11#[cfg(feature = "postgres")]
12pub struct PostgresConnection {
13 pool: Option<PgPool>,
14 connection_string: String,
15}
16
17#[cfg(feature = "postgres")]
18impl PostgresConnection {
19 pub fn new(connection_string: String) -> Self {
20 Self {
21 pool: None,
22 connection_string,
23 }
24 }
25}
26
27#[cfg(feature = "postgres")]
28#[async_trait]
29impl DatabaseConnection for PostgresConnection {
30 async fn connect(&mut self) -> DatabaseResult<()> {
31 let pool = PgPool::connect(&self.connection_string)
32 .await
33 .map_err(|e| DatabaseError::ConnectionFailed(e.to_string()))?;
34
35 self.pool = Some(pool);
36 Ok(())
37 }
38
39 async fn disconnect(&mut self) -> DatabaseResult<()> {
40 if let Some(pool) = self.pool.take() {
41 pool.close().await;
42 }
43 Ok(())
44 }
45
46 async fn is_connected(&self) -> bool {
47 self.pool.is_some()
48 }
49
50 async fn ping(&self) -> DatabaseResult<()> {
51 if let Some(pool) = &self.pool {
52 sqlx::query("SELECT 1")
53 .execute(pool)
54 .await
55 .map_err(|e| DatabaseError::QueryFailed(e.to_string()))?;
56 Ok(())
57 } else {
58 Err(DatabaseError::ConnectionFailed("Not connected".to_string()))
59 }
60 }
61}
62
63#[cfg(feature = "postgres")]
64#[async_trait]
65impl QueryExecutor for PostgresConnection {
66 async fn execute_query(
67 &self,
68 query: &str,
69 _params: &[&dyn QueryParam],
70 _context: &QueryContext,
71 ) -> DatabaseResult<QueryResult> {
72 if let Some(pool) = &self.pool {
73 let rows = sqlx::query(query)
74 .fetch_all(pool)
75 .await
76 .map_err(|e| DatabaseError::QueryFailed(e.to_string()))?;
77
78 let mut result_rows = Vec::new();
79 for row in rows {
80 let mut row_map = HashMap::new();
81 for (i, column) in row.columns().iter().enumerate() {
82 let value: serde_json::Value = row.try_get(i)
83 .unwrap_or(serde_json::Value::Null);
84 row_map.insert(column.name().to_string(), value);
85 }
86 result_rows.push(row_map);
87 }
88
89 Ok(QueryResult {
90 rows: result_rows,
91 rows_affected: 0,
92 last_insert_id: None,
93 })
94 } else {
95 Err(DatabaseError::ConnectionFailed("Not connected".to_string()))
96 }
97 }
98
99 async fn execute_query_with_options(
100 &self,
101 query: &str,
102 params: &[&dyn QueryParam],
103 _options: &QueryOptions,
104 context: &QueryContext,
105 ) -> DatabaseResult<QueryResult> {
106 self.execute_query(query, params, context).await
107 }
108}
109
110#[cfg(feature = "postgres")]
111pub struct PostgresTransaction {
112 }
114
115#[cfg(feature = "postgres")]
116#[async_trait]
117impl Transaction for PostgresTransaction {
118 async fn commit(self) -> DatabaseResult<()> {
119 Ok(())
120 }
121
122 async fn rollback(self) -> DatabaseResult<()> {
123 Ok(())
124 }
125
126 async fn execute_query(
127 &self,
128 _query: &str,
129 _params: &[&dyn QueryParam],
130 ) -> DatabaseResult<QueryResult> {
131 Ok(QueryResult {
132 rows: Vec::new(),
133 rows_affected: 0,
134 last_insert_id: None,
135 })
136 }
137}
138
139#[cfg(feature = "postgres")]
140#[async_trait]
141impl TransactionManager for PostgresConnection {
142 type Transaction = PostgresTransaction;
143
144 async fn begin_transaction(&self, _context: &QueryContext) -> DatabaseResult<Self::Transaction> {
145 Ok(PostgresTransaction {})
146 }
147}