1use std::marker::PhantomData;
4
5use prax_query::QueryResult;
6use prax_query::filter::FilterValue;
7use prax_query::traits::{BoxFuture, Model, QueryEngine};
8use tracing::debug;
9
10use crate::pool::PgPool;
11use crate::types::filter_value_to_sql;
12
13#[derive(Clone)]
15pub struct PgEngine {
16 pool: PgPool,
17}
18
19impl PgEngine {
20 pub fn new(pool: PgPool) -> Self {
22 Self { pool }
23 }
24
25 pub fn pool(&self) -> &PgPool {
27 &self.pool
28 }
29
30 fn to_params(
32 values: &[FilterValue],
33 ) -> Result<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>, prax_query::QueryError>
34 {
35 values
36 .iter()
37 .map(|v| {
38 filter_value_to_sql(v).map_err(|e| prax_query::QueryError::database(e.to_string()))
39 })
40 .collect()
41 }
42}
43
44impl QueryEngine for PgEngine {
45 fn query_many<T: Model + Send + 'static>(
46 &self,
47 sql: &str,
48 params: Vec<FilterValue>,
49 ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
50 let sql = sql.to_string();
51 Box::pin(async move {
52 debug!(sql = %sql, "Executing query_many");
53
54 let conn = self
55 .pool
56 .get()
57 .await
58 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
59
60 let pg_params = Self::to_params(¶ms)?;
61 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
62 pg_params.iter().map(|p| p.as_ref() as _).collect();
63
64 let rows = conn
65 .query(&sql, ¶m_refs)
66 .await
67 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
68
69 let _ = rows;
73 Ok(Vec::new())
74 })
75 }
76
77 fn query_one<T: Model + Send + 'static>(
78 &self,
79 sql: &str,
80 params: Vec<FilterValue>,
81 ) -> BoxFuture<'_, QueryResult<T>> {
82 let sql = sql.to_string();
83 Box::pin(async move {
84 debug!(sql = %sql, "Executing query_one");
85
86 let conn = self
87 .pool
88 .get()
89 .await
90 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
91
92 let pg_params = Self::to_params(¶ms)?;
93 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
94 pg_params.iter().map(|p| p.as_ref() as _).collect();
95
96 let _row = conn.query_one(&sql, ¶m_refs).await.map_err(|e| {
97 if e.to_string().contains("no rows") {
98 prax_query::QueryError::not_found(T::MODEL_NAME)
99 } else {
100 prax_query::QueryError::database(e.to_string())
101 }
102 })?;
103
104 Err(prax_query::QueryError::internal(
106 "deserialization not yet implemented".to_string(),
107 ))
108 })
109 }
110
111 fn query_optional<T: Model + Send + 'static>(
112 &self,
113 sql: &str,
114 params: Vec<FilterValue>,
115 ) -> BoxFuture<'_, QueryResult<Option<T>>> {
116 let sql = sql.to_string();
117 Box::pin(async move {
118 debug!(sql = %sql, "Executing query_optional");
119
120 let conn = self
121 .pool
122 .get()
123 .await
124 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
125
126 let pg_params = Self::to_params(¶ms)?;
127 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
128 pg_params.iter().map(|p| p.as_ref() as _).collect();
129
130 let row = conn
131 .query_opt(&sql, ¶m_refs)
132 .await
133 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
134
135 match row {
136 Some(_row) => {
137 Err(prax_query::QueryError::internal(
139 "deserialization not yet implemented".to_string(),
140 ))
141 }
142 None => Ok(None),
143 }
144 })
145 }
146
147 fn execute_insert<T: Model + Send + 'static>(
148 &self,
149 sql: &str,
150 params: Vec<FilterValue>,
151 ) -> BoxFuture<'_, QueryResult<T>> {
152 let sql = sql.to_string();
153 Box::pin(async move {
154 debug!(sql = %sql, "Executing insert");
155
156 let conn = self
157 .pool
158 .get()
159 .await
160 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
161
162 let pg_params = Self::to_params(¶ms)?;
163 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
164 pg_params.iter().map(|p| p.as_ref() as _).collect();
165
166 let _row = conn
167 .query_one(&sql, ¶m_refs)
168 .await
169 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
170
171 Err(prax_query::QueryError::internal(
173 "deserialization not yet implemented".to_string(),
174 ))
175 })
176 }
177
178 fn execute_update<T: Model + Send + 'static>(
179 &self,
180 sql: &str,
181 params: Vec<FilterValue>,
182 ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
183 let sql = sql.to_string();
184 Box::pin(async move {
185 debug!(sql = %sql, "Executing update");
186
187 let conn = self
188 .pool
189 .get()
190 .await
191 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
192
193 let pg_params = Self::to_params(¶ms)?;
194 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
195 pg_params.iter().map(|p| p.as_ref() as _).collect();
196
197 let rows = conn
198 .query(&sql, ¶m_refs)
199 .await
200 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
201
202 let _ = rows;
204 Ok(Vec::new())
205 })
206 }
207
208 fn execute_delete(
209 &self,
210 sql: &str,
211 params: Vec<FilterValue>,
212 ) -> BoxFuture<'_, QueryResult<u64>> {
213 let sql = sql.to_string();
214 Box::pin(async move {
215 debug!(sql = %sql, "Executing delete");
216
217 let conn = self
218 .pool
219 .get()
220 .await
221 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
222
223 let pg_params = Self::to_params(¶ms)?;
224 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
225 pg_params.iter().map(|p| p.as_ref() as _).collect();
226
227 let count = conn
228 .execute(&sql, ¶m_refs)
229 .await
230 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
231
232 Ok(count)
233 })
234 }
235
236 fn execute_raw(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
237 let sql = sql.to_string();
238 Box::pin(async move {
239 debug!(sql = %sql, "Executing raw SQL");
240
241 let conn = self
242 .pool
243 .get()
244 .await
245 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
246
247 let pg_params = Self::to_params(¶ms)?;
248 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
249 pg_params.iter().map(|p| p.as_ref() as _).collect();
250
251 let count = conn
252 .execute(&sql, ¶m_refs)
253 .await
254 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
255
256 Ok(count)
257 })
258 }
259
260 fn count(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
261 let sql = sql.to_string();
262 Box::pin(async move {
263 debug!(sql = %sql, "Executing count");
264
265 let conn = self
266 .pool
267 .get()
268 .await
269 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
270
271 let pg_params = Self::to_params(¶ms)?;
272 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
273 pg_params.iter().map(|p| p.as_ref() as _).collect();
274
275 let row = conn
276 .query_one(&sql, ¶m_refs)
277 .await
278 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
279
280 let count: i64 = row.get(0);
281 Ok(count as u64)
282 })
283 }
284}
285
286pub struct PgQueryBuilder<T: Model> {
288 engine: PgEngine,
289 _marker: PhantomData<T>,
290}
291
292impl<T: Model> PgQueryBuilder<T> {
293 pub fn new(engine: PgEngine) -> Self {
295 Self {
296 engine,
297 _marker: PhantomData,
298 }
299 }
300
301 pub fn engine(&self) -> &PgEngine {
303 &self.engine
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 }