shared/infrastructure/database/postgres/
adapter.rs1use async_trait::async_trait;
2use serde::{Serialize, de::DeserializeOwned};
3use sqlx::{FromRow, Pool, Postgres};
4use std::marker::PhantomData;
5
6use crate::error::CoreError;
7
8use crate::domain::database::DatabaseAdapter;
9use crate::domain::query::{IntoDbFilter, QueryBuilder};
10
11use crate::infrastructure::database::bindings::traits::{IdResult, PgInsert};
13
14pub struct PostgreSQLAdapter<T: Send + Sync> {
15 pool: Pool<Postgres>,
16 table: String,
17 _phantom: PhantomData<T>,
18}
19
20impl<T: Send + Sync> PostgreSQLAdapter<T> {
21 pub fn new(pool: &Pool<Postgres>, table: &str) -> Self {
22 PostgreSQLAdapter {
23 pool: pool.clone(),
24 table: table.into(),
25 _phantom: PhantomData,
26 }
27 }
28}
29
30#[async_trait]
31impl<T> DatabaseAdapter<T> for PostgreSQLAdapter<T>
32where
33 T: Send
34 + Sync
35 + Serialize
36 + DeserializeOwned
37 + 'static
38 + for<'r> FromRow<'r, sqlx::postgres::PgRow>
39 + Unpin
40 + PgInsert,
41{
42 async fn insert(&self, data: T) -> Result<String, CoreError> {
44 let columns: String = T::columns()
45 .iter()
46 .map(|k| format!("\"{}\"", k))
47 .collect::<Vec<String>>()
48 .join(", ");
49 let values = T::columns()
50 .iter()
51 .enumerate()
52 .map(|(i, _)| format!("${}", i + 1))
53 .collect::<Vec<String>>()
54 .join(", ");
55
56 let sql = format!(
57 "INSERT INTO {} ({}) VALUES ({}) RETURNING id",
58 self.table, columns, values
59 );
60
61 let query = sqlx::query_as::<_, IdResult>(&sql);
62
63 let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
64 Ok(row.id)
65 }
66
67 async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
68 let columns: String = T::columns()
69 .iter()
70 .map(|k| format!("\"{}\"", k))
71 .collect::<Vec<String>>()
72 .join(", ");
73
74 let mut count = 0;
75 let mut rows: Vec<String> = Vec::new();
76 for _ in 0..data.len() {
77 let values = T::columns()
78 .iter()
79 .map(|_| {
80 count += 1;
81 format!("${}", count)
82 })
83 .collect::<Vec<String>>()
84 .join(", ");
85 rows.push(format!("({values})"));
86 }
87 let placeholders = rows.join(", ");
88
89 let sql = format!(
102 "INSERT INTO {} ({}) VALUES {} RETURNING id",
103 self.table, columns, placeholders
104 );
105
106 let query = sqlx::query_as::<_, IdResult>(&sql);
107 let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
108
109 let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
110 Ok(rows.into_iter().map(|r| r.id).collect())
111 }
112
113 async fn upsert(&self, data: T) -> Result<String, CoreError> {
114 let columns: String = T::columns()
115 .iter()
116 .map(|k| format!("\"{}\"", k))
117 .collect::<Vec<String>>()
118 .join(", ");
119 let values = T::columns()
120 .iter()
121 .enumerate()
122 .map(|(i, _)| format!("${}", i + 1))
123 .collect::<Vec<String>>()
124 .join(", ");
125 let conflict_vals = T::uniques().join(", ");
126
127 let sql = format!(
128 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET id = {}.id RETURNING id",
129 self.table, columns, values, conflict_vals, self.table
130 );
131
132 let query = sqlx::query_as::<_, IdResult>(&sql);
133
134 let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
135 Ok(row.id)
136 }
137
138 async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
139 let columns: String = T::columns()
140 .iter()
141 .map(|k| format!("\"{}\"", k))
142 .collect::<Vec<String>>()
143 .join(", ");
144 let conflict_vals = T::uniques().join(", ");
145
146 let mut count = 0;
147 let mut rows: Vec<String> = Vec::new();
148 for _ in 0..data.len() {
149 let row = T::columns()
150 .iter()
151 .map(|_| {
152 count += 1;
153 format!("${}", count)
154 })
155 .collect::<Vec<String>>()
156 .join(", ");
157 rows.push(format!("({row})"));
158 }
159 let placeholders = rows.join(", ");
160
161 let sql = format!(
162 "INSERT INTO {} ({}) VALUES {} ON CONFLICT ({}) DO UPDATE SET id = {}.id RETURNING id",
163 self.table, columns, placeholders, conflict_vals, self.table
164 );
165
166 let query = sqlx::query_as::<_, IdResult>(&sql);
167 let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
168
169 let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
170 Ok(rows.into_iter().map(|r| r.id).collect())
171 }
172
173 async fn find_all(&self, filter: QueryBuilder) -> Result<Vec<T>, CoreError> {
175 let (where_clause, values) = filter.into_postgres_filter(0);
176
177 let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
178
179 let mut query = sqlx::query_as::<_, T>(&sql);
180 for v in values {
181 query = v.bind_pg(query);
182 }
183
184 query.fetch_all(&self.pool).await.map_err(Into::into)
185 }
186
187 async fn find_one(&self, filter: QueryBuilder) -> Result<Option<T>, CoreError> {
188 let (where_clause, values) = filter.into_postgres_filter(0);
189
190 let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
197
198 let mut query = sqlx::query_as::<_, T>(&sql);
199 for v in values {
200 query = v.bind_pg(query);
201 }
202
203 query.fetch_optional(&self.pool).await.map_err(Into::into)
204 }
205
206 async fn find_one_and_update(
207 &self,
208 filter: QueryBuilder,
209 update: QueryBuilder,
210 ) -> Result<Option<T>, CoreError> {
211 let (set_clause, update_values) = update.into_postgres_update();
212 let offset = update_values.len();
213 let (where_clause, filter_values) = filter.into_postgres_filter(offset);
214
215 let sql = format!(
216 "UPDATE {} SET {} WHERE {} RETURNING *",
217 self.table, set_clause, where_clause
218 );
219
220 let mut query = sqlx::query_as::<_, T>(&sql);
221 for v in update_values.into_iter().chain(filter_values) {
222 query = v.bind_pg(query);
223 }
224
225 query.fetch_optional(&self.pool).await.map_err(Into::into)
226 }
227
228 async fn update_many(
229 &self,
230 filter: QueryBuilder,
231 update: QueryBuilder,
232 ) -> Result<(), CoreError> {
233 let (set_clause, update_values) = update.into_postgres_update();
234 let offset = update_values.len();
235 let (where_clause, filter_values) = filter.into_postgres_filter(offset);
236
237 let sql = format!(
238 "UPDATE {} SET {} WHERE {} RETURNING *",
239 self.table, set_clause, where_clause
240 );
241
242 let mut query = sqlx::query_as::<_, T>(&sql);
243 for v in update_values.into_iter().chain(filter_values) {
244 query = v.bind_pg(query);
245 }
246
247 query.fetch_optional(&self.pool).await?;
248 Ok(())
249 }
250
251 async fn delete_one(&self, filter: QueryBuilder) -> Result<(), CoreError> {
252 let (where_clause, values) = filter.into_postgres_filter(0);
253
254 let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
255 let mut query = sqlx::query_as::<_, T>(&sql);
256 for v in values {
257 query = v.bind_pg(query);
258 }
259
260 query.fetch_optional(&self.pool).await?;
261 Ok(())
262 }
263 async fn delete_many(&self, filter: QueryBuilder) -> Result<(), CoreError> {
264 let (where_clause, values) = filter.into_postgres_filter(0);
265
266 let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
267 let mut query = sqlx::query_as::<_, T>(&sql);
268 for v in values {
269 query = v.bind_pg(query);
270 }
271
272 query.fetch_optional(&self.pool).await?;
273 Ok(())
274 }
275}