1use crate::{DatabaseConfig, DatabaseConnection, DbResult};
2use async_trait::async_trait;
3use sqlx::{Column, Pool, Postgres, Row, postgres::PgPool};
4use std::collections::HashMap;
5
6#[derive(Debug)]
7pub struct PostgresConnection {
8 pool: Pool<Postgres>,
9}
10
11impl PostgresConnection {
12 pub async fn connect(config: &DatabaseConfig) -> DbResult<Self> {
13 let pool = PgPool::connect(&config.connection_string())
14 .await
15 .map_err(|e| e.to_string())?;
16
17 Ok(Self { pool })
18 }
19}
20
21#[async_trait]
22impl DatabaseConnection for PostgresConnection {
23 async fn execute(&self, sql: &str) -> DbResult<u64> {
24 let result = sqlx::query(sql)
25 .execute(&self.pool)
26 .await
27 .map_err(|e| e.to_string())?;
28
29 Ok(result.rows_affected())
30 }
31
32 async fn query(&self, sql: &str) -> DbResult<String> {
33 let rows = sqlx::query(sql)
34 .fetch_all(&self.pool)
35 .await
36 .map_err(|e| e.to_string())?;
37 let mut results = Vec::new();
38 for row in rows {
39 let mut map = HashMap::new();
40 let columns = row.columns();
41 for (i, column) in columns.iter().enumerate() {
42 let name = column.name().to_string();
43 let value: String = row.try_get(i).unwrap_or_default();
44 map.insert(name, value);
45 }
46 results.push(map);
47 }
48 serde_json::to_string(&results).map_err(|e| e.to_string())
49 }
50
51 async fn query_one(&self, sql: &str) -> DbResult<String> {
52 let row = sqlx::query(sql)
53 .fetch_one(&self.pool)
54 .await
55 .map_err(|e| e.to_string())?;
56 let mut map = HashMap::new();
57 let columns = row.columns();
58 for (i, column) in columns.iter().enumerate() {
59 let name = column.name().to_string();
60 let value: String = row.try_get(i).unwrap_or_default();
61 map.insert(name, value);
62 }
63 serde_json::to_string(&map).map_err(|e| e.to_string())
64 }
65
66 async fn insert_batch(&self, table: &str, json_data: &str) -> DbResult<u64> {
67 let items: Vec<serde_json::Value> = serde_json::from_str(json_data)
68 .map_err(|e| format!("Failed to parse JSON data: {}", e))?;
69 if items.is_empty() {
70 return Ok(0);
71 }
72 let mut sql = String::new();
73 sql.push_str(&format!("INSERT INTO {} VALUES ", table));
74 for (i, item) in items.iter().enumerate() {
75 if i > 0 {
76 sql.push_str(", ");
77 }
78 let value = crate::to_sql_value(item)?;
79 sql.push_str(&format!("({})", value));
80 }
81 self.execute(&sql).await
82 }
83}