prax_postgres/
engine.rs

1//! PostgreSQL query engine implementation.
2
3use std::marker::PhantomData;
4
5use prax_query::QueryResult;
6use prax_query::filter::FilterValue;
7use prax_query::traits::{BoxFuture, Model, QueryEngine};
8use tracing::debug;
9
10use crate::pool::PgPool;
11use crate::types::filter_value_to_sql;
12
13/// PostgreSQL query engine that implements the Prax QueryEngine trait.
14#[derive(Clone)]
15pub struct PgEngine {
16    pool: PgPool,
17}
18
19impl PgEngine {
20    /// Create a new PostgreSQL engine with the given connection pool.
21    pub fn new(pool: PgPool) -> Self {
22        Self { pool }
23    }
24
25    /// Get a reference to the connection pool.
26    pub fn pool(&self) -> &PgPool {
27        &self.pool
28    }
29
30    /// Convert filter values to PostgreSQL parameters.
31    #[allow(clippy::result_large_err)]
32    fn to_params(
33        values: &[FilterValue],
34    ) -> Result<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>, prax_query::QueryError>
35    {
36        values
37            .iter()
38            .map(|v| {
39                filter_value_to_sql(v).map_err(|e| prax_query::QueryError::database(e.to_string()))
40            })
41            .collect()
42    }
43}
44
45impl QueryEngine for PgEngine {
46    fn query_many<T: Model + Send + 'static>(
47        &self,
48        sql: &str,
49        params: Vec<FilterValue>,
50    ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
51        let sql = sql.to_string();
52        Box::pin(async move {
53            debug!(sql = %sql, "Executing query_many");
54
55            let conn = self
56                .pool
57                .get()
58                .await
59                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
60
61            let pg_params = Self::to_params(&params)?;
62            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
63                pg_params.iter().map(|p| p.as_ref() as _).collect();
64
65            let rows = conn
66                .query(&sql, &param_refs)
67                .await
68                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
69
70            // For now, we'll return an empty vec since we need FromPgRow implementation
71            // In practice, this would deserialize rows into T
72            // This is a placeholder - real implementation would use FromPgRow
73            let _ = rows;
74            Ok(Vec::new())
75        })
76    }
77
78    fn query_one<T: Model + Send + 'static>(
79        &self,
80        sql: &str,
81        params: Vec<FilterValue>,
82    ) -> BoxFuture<'_, QueryResult<T>> {
83        let sql = sql.to_string();
84        Box::pin(async move {
85            debug!(sql = %sql, "Executing query_one");
86
87            let conn = self
88                .pool
89                .get()
90                .await
91                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
92
93            let pg_params = Self::to_params(&params)?;
94            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
95                pg_params.iter().map(|p| p.as_ref() as _).collect();
96
97            let _row = conn.query_one(&sql, &param_refs).await.map_err(|e| {
98                if e.to_string().contains("no rows") {
99                    prax_query::QueryError::not_found(T::MODEL_NAME)
100                } else {
101                    prax_query::QueryError::database(e.to_string())
102                }
103            })?;
104
105            // Placeholder - would deserialize row into T
106            Err(prax_query::QueryError::internal(
107                "deserialization not yet implemented".to_string(),
108            ))
109        })
110    }
111
112    fn query_optional<T: Model + Send + 'static>(
113        &self,
114        sql: &str,
115        params: Vec<FilterValue>,
116    ) -> BoxFuture<'_, QueryResult<Option<T>>> {
117        let sql = sql.to_string();
118        Box::pin(async move {
119            debug!(sql = %sql, "Executing query_optional");
120
121            let conn = self
122                .pool
123                .get()
124                .await
125                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
126
127            let pg_params = Self::to_params(&params)?;
128            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
129                pg_params.iter().map(|p| p.as_ref() as _).collect();
130
131            let row = conn
132                .query_opt(&sql, &param_refs)
133                .await
134                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
135
136            match row {
137                Some(_row) => {
138                    // Placeholder - would deserialize row into T
139                    Err(prax_query::QueryError::internal(
140                        "deserialization not yet implemented".to_string(),
141                    ))
142                }
143                None => Ok(None),
144            }
145        })
146    }
147
148    fn execute_insert<T: Model + Send + 'static>(
149        &self,
150        sql: &str,
151        params: Vec<FilterValue>,
152    ) -> BoxFuture<'_, QueryResult<T>> {
153        let sql = sql.to_string();
154        Box::pin(async move {
155            debug!(sql = %sql, "Executing insert");
156
157            let conn = self
158                .pool
159                .get()
160                .await
161                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
162
163            let pg_params = Self::to_params(&params)?;
164            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
165                pg_params.iter().map(|p| p.as_ref() as _).collect();
166
167            let _row = conn
168                .query_one(&sql, &param_refs)
169                .await
170                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
171
172            // Placeholder - would deserialize row into T
173            Err(prax_query::QueryError::internal(
174                "deserialization not yet implemented".to_string(),
175            ))
176        })
177    }
178
179    fn execute_update<T: Model + Send + 'static>(
180        &self,
181        sql: &str,
182        params: Vec<FilterValue>,
183    ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
184        let sql = sql.to_string();
185        Box::pin(async move {
186            debug!(sql = %sql, "Executing update");
187
188            let conn = self
189                .pool
190                .get()
191                .await
192                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
193
194            let pg_params = Self::to_params(&params)?;
195            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
196                pg_params.iter().map(|p| p.as_ref() as _).collect();
197
198            let rows = conn
199                .query(&sql, &param_refs)
200                .await
201                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
202
203            // Placeholder - would deserialize rows into Vec<T>
204            let _ = rows;
205            Ok(Vec::new())
206        })
207    }
208
209    fn execute_delete(
210        &self,
211        sql: &str,
212        params: Vec<FilterValue>,
213    ) -> BoxFuture<'_, QueryResult<u64>> {
214        let sql = sql.to_string();
215        Box::pin(async move {
216            debug!(sql = %sql, "Executing delete");
217
218            let conn = self
219                .pool
220                .get()
221                .await
222                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
223
224            let pg_params = Self::to_params(&params)?;
225            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
226                pg_params.iter().map(|p| p.as_ref() as _).collect();
227
228            let count = conn
229                .execute(&sql, &param_refs)
230                .await
231                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
232
233            Ok(count)
234        })
235    }
236
237    fn execute_raw(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
238        let sql = sql.to_string();
239        Box::pin(async move {
240            debug!(sql = %sql, "Executing raw SQL");
241
242            let conn = self
243                .pool
244                .get()
245                .await
246                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
247
248            let pg_params = Self::to_params(&params)?;
249            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
250                pg_params.iter().map(|p| p.as_ref() as _).collect();
251
252            let count = conn
253                .execute(&sql, &param_refs)
254                .await
255                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
256
257            Ok(count)
258        })
259    }
260
261    fn count(&self, sql: &str, params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
262        let sql = sql.to_string();
263        Box::pin(async move {
264            debug!(sql = %sql, "Executing count");
265
266            let conn = self
267                .pool
268                .get()
269                .await
270                .map_err(|e| prax_query::QueryError::connection(e.to_string()))?;
271
272            let pg_params = Self::to_params(&params)?;
273            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
274                pg_params.iter().map(|p| p.as_ref() as _).collect();
275
276            let row = conn
277                .query_one(&sql, &param_refs)
278                .await
279                .map_err(|e| prax_query::QueryError::database(e.to_string()))?;
280
281            let count: i64 = row.get(0);
282            Ok(count as u64)
283        })
284    }
285}
286
287/// A typed query builder that uses the PostgreSQL engine.
288pub struct PgQueryBuilder<T: Model> {
289    engine: PgEngine,
290    _marker: PhantomData<T>,
291}
292
293impl<T: Model> PgQueryBuilder<T> {
294    /// Create a new query builder.
295    pub fn new(engine: PgEngine) -> Self {
296        Self {
297            engine,
298            _marker: PhantomData,
299        }
300    }
301
302    /// Get the underlying engine.
303    pub fn engine(&self) -> &PgEngine {
304        &self.engine
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    // Integration tests would require a real PostgreSQL database
311}