1use std::marker::PhantomData;
4
5use prax_query::QueryResult;
6use prax_query::filter::FilterValue;
7use prax_query::traits::{BoxFuture, Model, QueryEngine};
8use tracing::debug;
9
10use crate::pool::PgPool;
11use crate::types::filter_value_to_sql;
12
13#[derive(Clone)]
15pub struct PgEngine {
16 pool: PgPool,
17}
18
19impl PgEngine {
20 pub fn new(pool: PgPool) -> Self {
22 Self { pool }
23 }
24
25 pub fn pool(&self) -> &PgPool {
27 &self.pool
28 }
29
30 #[allow(clippy::result_large_err)]
32 fn to_params(
33 values: &[FilterValue],
34 ) -> Result<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>, prax_query::QueryError>
35 {
36 values
37 .iter()
38 .map(|v| {
39 filter_value_to_sql(v).map_err(|e| prax_query::QueryError::database(e.to_string()))
40 })
41 .collect()
42 }
43}
44
45impl QueryEngine for PgEngine {
46 fn query_many<T: Model + Send + 'static>(
47 &self,
48 sql: &str,
49 params: Vec<FilterValue>,
50 ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
51 let sql = sql.to_string();
52 Box::pin(async move {
53 debug!(sql = %sql, "Executing query_many");
54
55 let conn = self
56 .pool
57 .get()
58 .await
59 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
60
61 let pg_params = Self::to_params(¶ms)?;
62 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
63 pg_params.iter().map(|p| p.as_ref() as _).collect();
64
65 let rows = conn
66 .query(&sql, ¶m_refs)
67 .await
68 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
69
70 let _ = rows;
74 Ok(Vec::new())
75 })
76 }
77
78 fn query_one<T: Model + Send + 'static>(
79 &self,
80 sql: &str,
81 params: Vec<FilterValue>,
82 ) -> BoxFuture<'_, QueryResult<T>> {
83 let sql = sql.to_string();
84 Box::pin(async move {
85 debug!(sql = %sql, "Executing query_one");
86
87 let conn = self
88 .pool
89 .get()
90 .await
91 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
92
93 let pg_params = Self::to_params(¶ms)?;
94 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
95 pg_params.iter().map(|p| p.as_ref() as _).collect();
96
97 let _row = conn.query_one(&sql, ¶m_refs).await.map_err(|e| {
98 if e.to_string().contains("no rows") {
99 prax_query::QueryError::not_found(T::MODEL_NAME)
100 } else {
101 prax_query::QueryError::database(e.to_string())
102 }
103 })?;
104
105 Err(prax_query::QueryError::internal(
107 "deserialization not yet implemented".to_string(),
108 ))
109 })
110 }
111
112 fn query_optional<T: Model + Send + 'static>(
113 &self,
114 sql: &str,
115 params: Vec<FilterValue>,
116 ) -> BoxFuture<'_, QueryResult<Option<T>>> {
117 let sql = sql.to_string();
118 Box::pin(async move {
119 debug!(sql = %sql, "Executing query_optional");
120
121 let conn = self
122 .pool
123 .get()
124 .await
125 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
126
127 let pg_params = Self::to_params(¶ms)?;
128 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
129 pg_params.iter().map(|p| p.as_ref() as _).collect();
130
131 let row = conn
132 .query_opt(&sql, ¶m_refs)
133 .await
134 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
135
136 match row {
137 Some(_row) => {
138 Err(prax_query::QueryError::internal(
140 "deserialization not yet implemented".to_string(),
141 ))
142 }
143 None => Ok(None),
144 }
145 })
146 }
147
148 fn execute_insert<T: Model + Send + 'static>(
149 &self,
150 sql: &str,
151 params: Vec<FilterValue>,
152 ) -> BoxFuture<'_, QueryResult<T>> {
153 let sql = sql.to_string();
154 Box::pin(async move {
155 debug!(sql = %sql, "Executing insert");
156
157 let conn = self
158 .pool
159 .get()
160 .await
161 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
162
163 let pg_params = Self::to_params(¶ms)?;
164 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
165 pg_params.iter().map(|p| p.as_ref() as _).collect();
166
167 let _row = conn
168 .query_one(&sql, ¶m_refs)
169 .await
170 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
171
172 Err(prax_query::QueryError::internal(
174 "deserialization not yet implemented".to_string(),
175 ))
176 })
177 }
178
179 fn execute_update<T: Model + Send + 'static>(
180 &self,
181 sql: &str,
182 params: Vec<FilterValue>,
183 ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
184 let sql = sql.to_string();
185 Box::pin(async move {
186 debug!(sql = %sql, "Executing update");
187
188 let conn = self
189 .pool
190 .get()
191 .await
192 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
193
194 let pg_params = Self::to_params(¶ms)?;
195 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
196 pg_params.iter().map(|p| p.as_ref() as _).collect();
197
198 let rows = conn
199 .query(&sql, ¶m_refs)
200 .await
201 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
202
203 let _ = rows;
205 Ok(Vec::new())
206 })
207 }
208
209 fn execute_delete(
210 &self,
211 sql: &str,
212 params: Vec<FilterValue>,
213 ) -> BoxFuture<'_, QueryResult<u64>> {
214 let sql = sql.to_string();
215 Box::pin(async move {
216 debug!(sql = %sql, "Executing delete");
217
218 let conn = self
219 .pool
220 .get()
221 .await
222 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
223
224 let pg_params = Self::to_params(¶ms)?;
225 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
226 pg_params.iter().map(|p| p.as_ref() as _).collect();
227
228 let count = conn
229 .execute(&sql, ¶m_refs)
230 .await
231 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
232
233 Ok(count)
234 })
235 }
236
237 fn execute_raw(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
238 let sql = sql.to_string();
239 Box::pin(async move {
240 debug!(sql = %sql, "Executing raw SQL");
241
242 let conn = self
243 .pool
244 .get()
245 .await
246 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
247
248 let pg_params = Self::to_params(¶ms)?;
249 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
250 pg_params.iter().map(|p| p.as_ref() as _).collect();
251
252 let count = conn
253 .execute(&sql, ¶m_refs)
254 .await
255 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
256
257 Ok(count)
258 })
259 }
260
261 fn count(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
262 let sql = sql.to_string();
263 Box::pin(async move {
264 debug!(sql = %sql, "Executing count");
265
266 let conn = self
267 .pool
268 .get()
269 .await
270 .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
271
272 let pg_params = Self::to_params(¶ms)?;
273 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
274 pg_params.iter().map(|p| p.as_ref() as _).collect();
275
276 let row = conn
277 .query_one(&sql, ¶m_refs)
278 .await
279 .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
280
281 let count: i64 = row.get(0);
282 Ok(count as u64)
283 })
284 }
285}
286
287pub struct PgQueryBuilder<T: Model> {
289 engine: PgEngine,
290 _marker: PhantomData<T>,
291}
292
293impl<T: Model> PgQueryBuilder<T> {
294 pub fn new(engine: PgEngine) -> Self {
296 Self {
297 engine,
298 _marker: PhantomData,
299 }
300 }
301
302 pub fn engine(&self) -> &PgEngine {
304 &self.engine
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 }