prax_postgres/
engine.rs

1//! PostgreSQL query engine implementation.
2
3use std::marker::PhantomData;
4
5use prax_query::QueryResult;
6use prax_query::filter::FilterValue;
7use prax_query::traits::{BoxFuture, Model, QueryEngine};
8use tracing::debug;
9
10use crate::pool::PgPool;
11use crate::types::filter_value_to_sql;
12
13/// PostgreSQL query engine that implements the Prax QueryEngine trait.
14#[derive(Clone)]
15pub struct PgEngine {
16    pool: PgPool,
17}
18
19impl PgEngine {
20    /// Create a new PostgreSQL engine with the given connection pool.
21    pub fn new(pool: PgPool) -> Self {
22        Self { pool }
23    }
24
25    /// Get a reference to the connection pool.
26    pub fn pool(&self) -> &PgPool {
27        &self.pool
28    }
29
30    /// Convert filter values to PostgreSQL parameters.
31    fn to_params(
32        values: &[FilterValue],
33    ) -> Result<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>, prax_query::QueryError>
34    {
35        values
36            .iter()
37            .map(|v| {
38                filter_value_to_sql(v).map_err(|e| prax_query::QueryError::database(e.to_string()))
39            })
40            .collect()
41    }
42}
43
44impl QueryEngine for PgEngine {
45    fn query_many<T: Model + Send + 'static>(
46        &self,
47        sql: &str,
48        params: Vec<FilterValue>,
49    ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
50        let sql = sql.to_string();
51        Box::pin(async move {
52            debug!(sql = %sql, "Executing query_many");
53
54            let conn = self
55                .pool
56                .get()
57                .await
58                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
59
60            let pg_params = Self::to_params(&params)?;
61            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
62                pg_params.iter().map(|p| p.as_ref() as _).collect();
63
64            let rows = conn
65                .query(&sql, &param_refs)
66                .await
67                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
68
69            // For now, we'll return an empty vec since we need FromPgRow implementation
70            // In practice, this would deserialize rows into T
71            // This is a placeholder - real implementation would use FromPgRow
72            let _ = rows;
73            Ok(Vec::new())
74        })
75    }
76
77    fn query_one<T: Model + Send + 'static>(
78        &self,
79        sql: &str,
80        params: Vec<FilterValue>,
81    ) -> BoxFuture<'_, QueryResult<T>> {
82        let sql = sql.to_string();
83        Box::pin(async move {
84            debug!(sql = %sql, "Executing query_one");
85
86            let conn = self
87                .pool
88                .get()
89                .await
90                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
91
92            let pg_params = Self::to_params(&params)?;
93            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
94                pg_params.iter().map(|p| p.as_ref() as _).collect();
95
96            let _row = conn.query_one(&sql, &param_refs).await.map_err(|e| {
97                if e.to_string().contains("no rows") {
98                    prax_query::QueryError::not_found(T::MODEL_NAME)
99                } else {
100                    prax_query::QueryError::database(e.to_string())
101                }
102            })?;
103
104            // Placeholder - would deserialize row into T
105            Err(prax_query::QueryError::internal(
106                "deserialization not yet implemented".to_string(),
107            ))
108        })
109    }
110
111    fn query_optional<T: Model + Send + 'static>(
112        &self,
113        sql: &str,
114        params: Vec<FilterValue>,
115    ) -> BoxFuture<'_, QueryResult<Option<T>>> {
116        let sql = sql.to_string();
117        Box::pin(async move {
118            debug!(sql = %sql, "Executing query_optional");
119
120            let conn = self
121                .pool
122                .get()
123                .await
124                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
125
126            let pg_params = Self::to_params(&params)?;
127            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
128                pg_params.iter().map(|p| p.as_ref() as _).collect();
129
130            let row = conn
131                .query_opt(&sql, &param_refs)
132                .await
133                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
134
135            match row {
136                Some(_row) => {
137                    // Placeholder - would deserialize row into T
138                    Err(prax_query::QueryError::internal(
139                        "deserialization not yet implemented".to_string(),
140                    ))
141                }
142                None => Ok(None),
143            }
144        })
145    }
146
147    fn execute_insert<T: Model + Send + 'static>(
148        &self,
149        sql: &str,
150        params: Vec<FilterValue>,
151    ) -> BoxFuture<'_, QueryResult<T>> {
152        let sql = sql.to_string();
153        Box::pin(async move {
154            debug!(sql = %sql, "Executing insert");
155
156            let conn = self
157                .pool
158                .get()
159                .await
160                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
161
162            let pg_params = Self::to_params(&params)?;
163            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
164                pg_params.iter().map(|p| p.as_ref() as _).collect();
165
166            let _row = conn
167                .query_one(&sql, &param_refs)
168                .await
169                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
170
171            // Placeholder - would deserialize row into T
172            Err(prax_query::QueryError::internal(
173                "deserialization not yet implemented".to_string(),
174            ))
175        })
176    }
177
178    fn execute_update<T: Model + Send + 'static>(
179        &self,
180        sql: &str,
181        params: Vec<FilterValue>,
182    ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
183        let sql = sql.to_string();
184        Box::pin(async move {
185            debug!(sql = %sql, "Executing update");
186
187            let conn = self
188                .pool
189                .get()
190                .await
191                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
192
193            let pg_params = Self::to_params(&params)?;
194            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
195                pg_params.iter().map(|p| p.as_ref() as _).collect();
196
197            let rows = conn
198                .query(&sql, &param_refs)
199                .await
200                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
201
202            // Placeholder - would deserialize rows into Vec<T>
203            let _ = rows;
204            Ok(Vec::new())
205        })
206    }
207
208    fn execute_delete(
209        &self,
210        sql: &str,
211        params: Vec<FilterValue>,
212    ) -> BoxFuture<'_, QueryResult<u64>> {
213        let sql = sql.to_string();
214        Box::pin(async move {
215            debug!(sql = %sql, "Executing delete");
216
217            let conn = self
218                .pool
219                .get()
220                .await
221                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
222
223            let pg_params = Self::to_params(&params)?;
224            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
225                pg_params.iter().map(|p| p.as_ref() as _).collect();
226
227            let count = conn
228                .execute(&sql, &param_refs)
229                .await
230                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
231
232            Ok(count)
233        })
234    }
235
236    fn execute_raw(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
237        let sql = sql.to_string();
238        Box::pin(async move {
239            debug!(sql = %sql, "Executing raw SQL");
240
241            let conn = self
242                .pool
243                .get()
244                .await
245                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
246
247            let pg_params = Self::to_params(&params)?;
248            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
249                pg_params.iter().map(|p| p.as_ref() as _).collect();
250
251            let count = conn
252                .execute(&sql, &param_refs)
253                .await
254                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
255
256            Ok(count)
257        })
258    }
259
260    fn count(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
261        let sql = sql.to_string();
262        Box::pin(async move {
263            debug!(sql = %sql, "Executing count");
264
265            let conn = self
266                .pool
267                .get()
268                .await
269                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
270
271            let pg_params = Self::to_params(&params)?;
272            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
273                pg_params.iter().map(|p| p.as_ref() as _).collect();
274
275            let row = conn
276                .query_one(&sql, &param_refs)
277                .await
278                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
279
280            let count: i64 = row.get(0);
281            Ok(count as u64)
282        })
283    }
284}
285
286/// A typed query builder that uses the PostgreSQL engine.
287pub struct PgQueryBuilder<T: Model> {
288    engine: PgEngine,
289    _marker: PhantomData<T>,
290}
291
292impl<T: Model> PgQueryBuilder<T> {
293    /// Create a new query builder.
294    pub fn new(engine: PgEngine) -> Self {
295        Self {
296            engine,
297            _marker: PhantomData,
298        }
299    }
300
301    /// Get the underlying engine.
302    pub fn engine(&self) -> &PgEngine {
303        &self.engine
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    // Integration tests would require a real PostgreSQL database
310}