bubble_db/
postgres.rs

1use crate::{DatabaseConfig, DatabaseConnection, DbResult};
2use async_trait::async_trait;
3use sqlx::{Column, Pool, Postgres, Row, postgres::PgPool};
4use std::collections::HashMap;
5
6#[derive(Debug)]
7pub struct PostgresConnection {
8    pool: Pool<Postgres>,
9}
10
11impl PostgresConnection {
12    pub async fn connect(config: &DatabaseConfig) -> DbResult<Self> {
13        let pool = PgPool::connect(&config.connection_string())
14            .await
15            .map_err(|e| e.to_string())?;
16
17        Ok(Self { pool })
18    }
19}
20
21#[async_trait]
22impl DatabaseConnection for PostgresConnection {
23    async fn execute(&self, sql: &str) -> DbResult<u64> {
24        let result = sqlx::query(sql)
25            .execute(&self.pool)
26            .await
27            .map_err(|e| e.to_string())?;
28
29        Ok(result.rows_affected())
30    }
31
32    async fn query(&self, sql: &str) -> DbResult<String> {
33        let rows = sqlx::query(sql)
34            .fetch_all(&self.pool)
35            .await
36            .map_err(|e| e.to_string())?;
37        let mut results = Vec::new();
38        for row in rows {
39            let mut map = HashMap::new();
40            let columns = row.columns();
41            for (i, column) in columns.iter().enumerate() {
42                let name = column.name().to_string();
43                let value: String = row.try_get(i).unwrap_or_default();
44                map.insert(name, value);
45            }
46            results.push(map);
47        }
48        serde_json::to_string(&results).map_err(|e| e.to_string())
49    }
50
51    async fn query_one(&self, sql: &str) -> DbResult<String> {
52        let row = sqlx::query(sql)
53            .fetch_one(&self.pool)
54            .await
55            .map_err(|e| e.to_string())?;
56        let mut map = HashMap::new();
57        let columns = row.columns();
58        for (i, column) in columns.iter().enumerate() {
59            let name = column.name().to_string();
60            let value: String = row.try_get(i).unwrap_or_default();
61            map.insert(name, value);
62        }
63        serde_json::to_string(&map).map_err(|e| e.to_string())
64    }
65
66    async fn insert_batch(&self, table: &str, json_data: &str) -> DbResult<u64> {
67        let items: Vec<serde_json::Value> = serde_json::from_str(json_data)
68            .map_err(|e| format!("Failed to parse JSON data: {}", e))?;
69        if items.is_empty() {
70            return Ok(0);
71        }
72        let mut sql = String::new();
73        sql.push_str(&format!("INSERT INTO {} VALUES ", table));
74        for (i, item) in items.iter().enumerate() {
75            if i > 0 {
76                sql.push_str(", ");
77            }
78            let value = crate::to_sql_value(item)?;
79            sql.push_str(&format!("({})", value));
80        }
81        self.execute(&sql).await
82    }
83}