Skip to main content

shared/infrastructure/database/postgres/
adapter.rs

1use async_trait::async_trait;
2use serde::{Serialize, de::DeserializeOwned};
3use sqlx::{FromRow, Pool, Postgres};
4use std::marker::PhantomData;
5
6use crate::error::CoreError;
7
8use crate::domain::database::DatabaseAdapter;
9use crate::domain::query::{IntoDbFilter, QueryBuilder};
10
11//FIXME simplify the imports, its local
12use crate::infrastructure::database::bindings::traits::{IdResult, PgInsert};
13
14pub struct PostgreSQLAdapter<T: Send + Sync> {
15    pool: Pool<Postgres>,
16    table: String,
17    _phantom: PhantomData<T>,
18}
19
20impl<T: Send + Sync> PostgreSQLAdapter<T> {
21    pub fn new(pool: &Pool<Postgres>, table: &str) -> Self {
22        PostgreSQLAdapter {
23            pool: pool.clone(),
24            table: table.into(),
25            _phantom: PhantomData,
26        }
27    }
28}
29
30#[async_trait]
31impl<T> DatabaseAdapter<T> for PostgreSQLAdapter<T>
32where
33    T: Send
34        + Sync
35        + Serialize
36        + DeserializeOwned
37        + 'static
38        + for<'r> FromRow<'r, sqlx::postgres::PgRow>
39        + Unpin
40        + PgInsert,
41{
42    // -- INSERTS
43    async fn insert(&self, data: T) -> Result<String, CoreError> {
44        let columns: String = T::columns()
45            .iter()
46            .map(|k| format!("\"{}\"", k))
47            .collect::<Vec<String>>()
48            .join(", ");
49        let values = T::columns()
50            .iter()
51            .enumerate()
52            .map(|(i, _)| format!("${}", i + 1))
53            .collect::<Vec<String>>()
54            .join(", ");
55
56        let sql = format!(
57            "INSERT INTO {} ({}) VALUES ({}) RETURNING id",
58            self.table, columns, values
59        );
60
61        let query = sqlx::query_as::<_, IdResult>(&sql);
62
63        let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
64        Ok(row.id)
65    }
66
67    async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
68        let columns: String = T::columns()
69            .iter()
70            .map(|k| format!("\"{}\"", k))
71            .collect::<Vec<String>>()
72            .join(", ");
73
74        let mut count = 0;
75        let mut rows: Vec<String> = Vec::new();
76        for _ in 0..data.len() {
77            let values = T::columns()
78                .iter()
79                .map(|_| {
80                    count += 1;
81                    format!("${}", count)
82                })
83                .collect::<Vec<String>>()
84                .join(", ");
85            rows.push(format!("({values})"));
86        }
87        let placeholders = rows.join(", ");
88
89        // let col_count = T::columns().len();
90        // let placeholders: String = (0..data.len())
91        //     .map(|i| {
92        //         let row = (0..col_count)
93        //             .map(|j| format!("${}", i * col_count + j + 1))
94        //             .collect::<Vec<String>>()
95        //             .join(", ");
96        //         format!("({row})")
97        //     })
98        //     .collect::<Vec<String>>()
99        //     .join(", ");
100
101        let sql = format!(
102            "INSERT INTO {} ({}) VALUES {} RETURNING id",
103            self.table, columns, placeholders
104        );
105
106        let query = sqlx::query_as::<_, IdResult>(&sql);
107        let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
108
109        let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
110        Ok(rows.into_iter().map(|r| r.id).collect())
111    }
112
113    async fn upsert(&self, data: T) -> Result<String, CoreError> {
114        let columns: String = T::columns()
115            .iter()
116            .map(|k| format!("\"{}\"", k))
117            .collect::<Vec<String>>()
118            .join(", ");
119        let values = T::columns()
120            .iter()
121            .enumerate()
122            .map(|(i, _)| format!("${}", i + 1))
123            .collect::<Vec<String>>()
124            .join(", ");
125        let conflict_vals = T::uniques().join(", ");
126
127        let sql = format!(
128            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET id = {}.id RETURNING id",
129            self.table, columns, values, conflict_vals, self.table
130        );
131
132        let query = sqlx::query_as::<_, IdResult>(&sql);
133
134        let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
135        Ok(row.id)
136    }
137
138    async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
139        let columns: String = T::columns()
140            .iter()
141            .map(|k| format!("\"{}\"", k))
142            .collect::<Vec<String>>()
143            .join(", ");
144        let conflict_vals = T::uniques().join(", ");
145
146        let mut count = 0;
147        let mut rows: Vec<String> = Vec::new();
148        for _ in 0..data.len() {
149            let row = T::columns()
150                .iter()
151                .map(|_| {
152                    count += 1;
153                    format!("${}", count)
154                })
155                .collect::<Vec<String>>()
156                .join(", ");
157            rows.push(format!("({row})"));
158        }
159        let placeholders = rows.join(", ");
160
161        let sql = format!(
162            "INSERT INTO {} ({}) VALUES {} ON CONFLICT ({}) DO UPDATE SET id = {}.id RETURNING id",
163            self.table, columns, placeholders, conflict_vals, self.table
164        );
165
166        let query = sqlx::query_as::<_, IdResult>(&sql);
167        let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
168
169        let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
170        Ok(rows.into_iter().map(|r| r.id).collect())
171    }
172
173    // -- FINDS
174    async fn find_all(&self, filter: QueryBuilder) -> Result<Vec<T>, CoreError> {
175        let (where_clause, values) = filter.into_postgres_filter(0);
176
177        let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
178
179        let mut query = sqlx::query_as::<_, T>(&sql);
180        for v in values {
181            query = v.bind_pg(query);
182        }
183
184        query.fetch_all(&self.pool).await.map_err(Into::into)
185    }
186
187    async fn find_one(&self, filter: QueryBuilder) -> Result<Option<T>, CoreError> {
188        let (where_clause, values) = filter.into_postgres_filter(0);
189
190        // SELECT DISTINCT t1.name
191        // FROM {self.table1} t1
192        // JOIN {self.table2} t2 ON t2."permissionId" = t1.id
193        // JOIN {self.table3} t3 ON t3."roleId" = t2."roleId"
194        // WHERE t3."userId" = $1
195
196        let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
197
198        let mut query = sqlx::query_as::<_, T>(&sql);
199        for v in values {
200            query = v.bind_pg(query);
201        }
202
203        query.fetch_optional(&self.pool).await.map_err(Into::into)
204    }
205
206    async fn find_one_and_update(
207        &self,
208        filter: QueryBuilder,
209        update: QueryBuilder,
210    ) -> Result<Option<T>, CoreError> {
211        let (set_clause, update_values) = update.into_postgres_update();
212        let offset = update_values.len();
213        let (where_clause, filter_values) = filter.into_postgres_filter(offset);
214
215        let sql = format!(
216            "UPDATE {} SET {} WHERE {} RETURNING *",
217            self.table, set_clause, where_clause
218        );
219
220        let mut query = sqlx::query_as::<_, T>(&sql);
221        for v in update_values.into_iter().chain(filter_values) {
222            query = v.bind_pg(query);
223        }
224
225        query.fetch_optional(&self.pool).await.map_err(Into::into)
226    }
227
228    async fn update_many(
229        &self,
230        filter: QueryBuilder,
231        update: QueryBuilder,
232    ) -> Result<(), CoreError> {
233        let (set_clause, update_values) = update.into_postgres_update();
234        let offset = update_values.len();
235        let (where_clause, filter_values) = filter.into_postgres_filter(offset);
236
237        let sql = format!(
238            "UPDATE {} SET {} WHERE {} RETURNING *",
239            self.table, set_clause, where_clause
240        );
241
242        let mut query = sqlx::query_as::<_, T>(&sql);
243        for v in update_values.into_iter().chain(filter_values) {
244            query = v.bind_pg(query);
245        }
246
247        query.fetch_optional(&self.pool).await?;
248        Ok(())
249    }
250
251    async fn delete_one(&self, filter: QueryBuilder) -> Result<(), CoreError> {
252        let (where_clause, values) = filter.into_postgres_filter(0);
253
254        let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
255        let mut query = sqlx::query_as::<_, T>(&sql);
256        for v in values {
257            query = v.bind_pg(query);
258        }
259
260        query.fetch_optional(&self.pool).await?;
261        Ok(())
262    }
263    async fn delete_many(&self, filter: QueryBuilder) -> Result<(), CoreError> {
264        let (where_clause, values) = filter.into_postgres_filter(0);
265
266        let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
267        let mut query = sqlx::query_as::<_, T>(&sql);
268        for v in values {
269            query = v.bind_pg(query);
270        }
271
272        query.fetch_optional(&self.pool).await?;
273        Ok(())
274    }
275}