welds_connections/postgres/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use super::transaction::{TransT, Transaction};
use super::Row;
use super::TransactStart;
use super::{Client, Param};
use crate::errors::Result;
use crate::ExecuteResult;
use async_trait::async_trait;
use sqlx::postgres::PgArguments;
use sqlx::query::Query;
use sqlx::{PgPool, Postgres};
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct PostgresClient {
    pool: Arc<PgPool>,
}

#[async_trait]
impl TransactStart for PostgresClient {
    async fn begin(&self) -> Result<Transaction> {
        let t = self.pool.begin().await?;
        let t = TransT::Postgres(t);
        Ok(Transaction::new(t))
    }
}

pub async fn connect(url: &str) -> Result<PostgresClient> {
    let pool = PgPool::connect(url).await?;
    Ok(PostgresClient {
        pool: Arc::new(pool),
    })
}

impl From<sqlx::PgPool> for PostgresClient {
    fn from(pool: sqlx::PgPool) -> PostgresClient {
        PostgresClient {
            pool: Arc::new(pool),
        }
    }
}

impl PostgresClient {
    pub fn as_sqlx_pool(&self) -> &PgPool {
        &self.pool
    }
}

use sqlx::encode::Encode;
use sqlx::types::Type;

#[async_trait]
impl Client for PostgresClient {
    async fn execute(&self, sql: &str, params: &[&(dyn Param + Sync)]) -> Result<ExecuteResult> {
        let mut query = sqlx::query::<Postgres>(sql);
        for param in params {
            query = PostgresParam::add_param(*param, query);
        }
        let r = query.execute(&*self.pool).await?;
        Ok(ExecuteResult {
            rows_affected: r.rows_affected(),
        })
    }

    async fn fetch_rows(&self, sql: &str, params: &[&(dyn Param + Sync)]) -> Result<Vec<Row>> {
        let mut query = sqlx::query::<Postgres>(sql);
        for param in params {
            query = PostgresParam::add_param(*param, query);
        }
        let mut raw_rows = query.fetch_all(&*self.pool).await?;
        let rows: Vec<Row> = raw_rows.drain(..).map(Row::from).collect();
        Ok(rows)
    }

    async fn fetch_many<'s, 'args, 't>(
        &self,
        fetches: &[crate::Fetch<'s, 'args, 't>],
    ) -> Result<Vec<Vec<Row>>> {
        let mut datasets = Vec::default();
        let mut conn = self.pool.acquire().await?;
        for fetch in fetches {
            let sql = fetch.sql;
            let params = fetch.params;
            let mut query = sqlx::query::<Postgres>(sql);
            for param in params {
                query = PostgresParam::add_param(*param, query);
            }
            let mut raw_rows = query.fetch_all(&mut *conn).await?;
            let rows: Vec<Row> = raw_rows.drain(..).map(Row::from).collect();
            datasets.push(rows);
        }
        Ok(datasets)
    }

    fn syntax(&self) -> crate::Syntax {
        crate::Syntax::Postgres
    }
}

pub trait PostgresParam {
    fn add_param<'q>(
        &'q self,
        query: Query<'q, Postgres, PgArguments>,
    ) -> Query<'q, Postgres, PgArguments>;
}

impl<T> PostgresParam for T
where
    for<'a> T: 'a + Send + Encode<'a, Postgres> + Type<Postgres>,
    for<'a> &'a T: Send,
{
    fn add_param<'q>(
        &'q self,
        query: Query<'q, Postgres, PgArguments>,
    ) -> Query<'q, Postgres, PgArguments> {
        query.bind(self)
    }
}