parsql_deadpool_postgres/
pool_extensions.rs1use crate::traits::{CrudOps, FromRow, SqlCommand, SqlParams, SqlQuery, UpdateParams};
2use deadpool_postgres::{GenericClient, Pool};
3use postgres::types::FromSql;
4use std::sync::OnceLock;
5use tokio_postgres::{Error, Row};
6
7fn pool_err_to_io_err(e: deadpool_postgres::PoolError) -> Error {
9 let err = Error::__private_api_timeout();
12
13 eprintln!("Pool bağlantı hatası: {}", e);
15
16 err
17}
18
19#[async_trait::async_trait]
21pub trait PoolExtensions {
22 async fn insert<T, P: for<'a> FromSql<'a> + Send + Sync>(&self, entity: T) -> Result<P, Error>
24 where
25 T: SqlCommand + SqlParams + Send + Sync + 'static;
26
27 async fn update<T>(&self, entity: T) -> Result<bool, Error>
29 where
30 T: SqlCommand + UpdateParams + Send + Sync + 'static;
31
32 async fn delete<T>(&self, entity: T) -> Result<u64, Error>
34 where
35 T: SqlCommand + SqlParams + Send + Sync + 'static;
36
37 async fn fetch<P, R>(&self, params: P) -> Result<R, Error>
39 where
40 P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
41 R: FromRow + Send + Sync + 'static;
42
43 async fn fetch_all<P, R>(&self, params: P) -> Result<Vec<R>, Error>
45 where
46 P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
47 R: FromRow + Send + Sync + 'static;
48}
49
50#[async_trait::async_trait]
51impl PoolExtensions for Pool {
52 async fn insert<T, P: for<'a> FromSql<'a> + Send + Sync>(&self, entity: T) -> Result<P, Error>
53 where
54 T: SqlCommand + SqlParams + Send + Sync + 'static,
55 {
56 let client = self.get().await.map_err(pool_err_to_io_err)?;
57
58 let sql = T::query();
59
60 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
61 let is_trace_enabled =
62 *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
63
64 if is_trace_enabled {
65 println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
66 }
67
68 let params = entity.params();
69 let row = client.query_one(&sql, ¶ms).await?;
70 row.try_get::<_, P>(0)
71 }
72
73 async fn update<T>(&self, entity: T) -> Result<bool, Error>
74 where
75 T: SqlCommand + UpdateParams + Send + Sync + 'static,
76 {
77 let client = self.get().await.map_err(pool_err_to_io_err)?;
78
79 let sql = T::query();
80
81 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
82 let is_trace_enabled =
83 *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
84
85 if is_trace_enabled {
86 println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
87 }
88
89 let params = entity.params();
90 let result = client.execute(&sql, ¶ms).await?;
91 Ok(result > 0)
92 }
93
94 async fn delete<T>(&self, entity: T) -> Result<u64, Error>
95 where
96 T: SqlCommand + SqlParams + Send + Sync + 'static,
97 {
98 let client = self.get().await.map_err(pool_err_to_io_err)?;
99
100 let sql = T::query();
101
102 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
103 let is_trace_enabled =
104 *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
105
106 if is_trace_enabled {
107 println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
108 }
109
110 let params = entity.params();
111 client.execute(&sql, ¶ms).await
112 }
113
114 async fn fetch<P, R>(&self, params: P) -> Result<R, Error>
115 where
116 P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
117 R: FromRow + Send + Sync + 'static,
118 {
119 let client = self.get().await.map_err(pool_err_to_io_err)?;
120
121 let sql = P::query();
122
123 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
124 let is_trace_enabled =
125 *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
126
127 if is_trace_enabled {
128 println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
129 }
130
131 let query_params = params.params();
132 let row = client.query_one(&sql, &query_params).await?;
133 R::from_row(&row)
134 }
135
136 async fn fetch_all<P, R>(&self, params: P) -> Result<Vec<R>, Error>
137 where
138 P: SqlQuery<R> + SqlParams + Send + Sync + 'static,
139 R: FromRow + Send + Sync + 'static,
140 {
141 let client = self.get().await.map_err(pool_err_to_io_err)?;
142
143 let sql = P::query();
144
145 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
146 let is_trace_enabled =
147 *TRACE_ENABLED.get_or_init(|| std::env::var("PARSQL_TRACE").unwrap_or_default() == "1");
148
149 if is_trace_enabled {
150 println!("[PARSQL-DEADPOOL-POSTGRES] Execute SQL: {}", sql);
151 }
152
153 let query_params = params.params();
154 let rows = client.query(&sql, &query_params).await?;
155
156 let mut results = Vec::with_capacity(rows.len());
157 for row in rows {
158 results.push(R::from_row(&row)?);
159 }
160
161 Ok(results)
162 }
163}