parsql_deadpool_postgres/
pool_extensions.rs

1use crate::traits::{CrudOps, FromRow, SqlCommand, SqlParams, SqlQuery, UpdateParams};
2use deadpool_postgres::{GenericClient, Pool};
3use postgres::types::FromSql;
4use std::sync::OnceLock;
5use tokio_postgres::{Error, Row};
6
7// Daha basit bir yaklaşım: PoolError'dan genel bir Error oluştur
8fn pool_err_to_io_err(e: deadpool_postgres::PoolError) -> Error {
9    // Bu özel fonksiyon tokio_postgres'in sağladığı timeout hatasını döndürür
10    // Güzel bir çözüm değil, ama çalışır bir örnek için kullanılabilir
11    let err = Error::__private_api_timeout();
12
13    // Debug süreci için stderr'e hatayı yazdıralım
14    eprintln!("Pool bağlantı hatası: {}", e);
15
16    err
17}
18
19/// Pool extension trait for additional query operations
20#[async_trait::async_trait]
21pub trait PoolExtensions {
22    /// Inserts a new record into the database
23    async fn insert<T, P: for<'a> FromSql<'a> + Send + Sync>(&self, entity: T) -> Result<P, Error>
24    where
25        T: SqlCommand + SqlParams + Send + Sync + 'static;
26
27    /// Updates an existing record in the database
28    async fn update<T>(&self, entity: T) -> Result<bool, Error>
29    where
30        T: SqlCommand + UpdateParams + Send + Sync + 'static;
31
32    /// Deletes a record from the database
33    async fn delete<T>(&self, entity: T) -> Result<u64, Error>
34    where
35        T: SqlCommand + SqlParams + Send + Sync + 'static;
36
37    /// Retrieves a single record from the database
38    async fn fetch<P, R>(&self, params: P) -> Result<R, Error>
39    where
40        P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
41        R: FromRow + Send + Sync + 'static;
42
43    /// Retrieves multiple records from the database
44    async fn fetch_all<P, R>(&self, params: P) -> Result<Vec<R>, Error>
45    where
46        P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
47        R: FromRow + Send + Sync + 'static;
48}
49
50#[async_trait::async_trait]
51impl PoolExtensions for Pool {
52    async fn insert<T, P: for<'a> FromSql<'a> + Send + Sync>(&self, entity: T) -> Result<P, Error>
53    where
54        T: SqlCommand + SqlParams + Send + Sync + 'static,
55    {
56        let client = self.get().await.map_err(pool_err_to_io_err)?;
57
58        let sql = T::query();
59
60        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
61        let is_trace_enabled =
62            *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
63
64        if is_trace_enabled {
65            println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
66        }
67
68        let params = entity.params();
69        let row = client.query_one(&sql, &params).await?;
70        row.try_get::<_, P>(0)
71    }
72
73    async fn update<T>(&self, entity: T) -> Result<bool, Error>
74    where
75        T: SqlCommand + UpdateParams + Send + Sync + 'static,
76    {
77        let client = self.get().await.map_err(pool_err_to_io_err)?;
78
79        let sql = T::query();
80
81        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
82        let is_trace_enabled =
83            *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
84
85        if is_trace_enabled {
86            println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
87        }
88
89        let params = entity.params();
90        let result = client.execute(&sql, &params).await?;
91        Ok(result > 0)
92    }
93
94    async fn delete<T>(&self, entity: T) -> Result<u64, Error>
95    where
96        T: SqlCommand + SqlParams + Send + Sync + 'static,
97    {
98        let client = self.get().await.map_err(pool_err_to_io_err)?;
99
100        let sql = T::query();
101
102        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
103        let is_trace_enabled =
104            *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
105
106        if is_trace_enabled {
107            println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
108        }
109
110        let params = entity.params();
111        client.execute(&sql, &params).await
112    }
113
114    async fn fetch<P, R>(&self, params: P) -> Result<R, Error>
115    where
116        P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
117        R: FromRow + Send + Sync + 'static,
118    {
119        let client = self.get().await.map_err(pool_err_to_io_err)?;
120
121        let sql = P::query();
122
123        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
124        let is_trace_enabled =
125            *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
126
127        if is_trace_enabled {
128            println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
129        }
130
131        let query_params = params.params();
132        let row = client.query_one(&sql, &query_params).await?;
133        R::from_row(&row)
134    }
135
136    async fn fetch_all<P, R>(&self, params: P) -> Result<Vec<R>, Error>
137    where
138        P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
139        R: FromRow + Send + Sync + 'static,
140    {
141        let client = self.get().await.map_err(pool_err_to_io_err)?;
142
143        let sql = P::query();
144
145        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
146        let is_trace_enabled =
147            *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
148
149        if is_trace_enabled {
150            println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
151        }
152
153        let query_params = params.params();
154        let rows = client.query(&sql, &query_params).await?;
155
156        let mut results = Vec::with_capacity(rows.len());
157        for row in rows {
158            results.push(R::from_row(&row)?);
159        }
160
161        Ok(results)
162    }
163}