Skip to main content

shared/infrastructure/database/sqlite/
adapter.rs

1use async_trait::async_trait;
2use serde::{Serialize, de::DeserializeOwned};
3use sqlx::{FromRow, Pool, Sqlite};
4use std::marker::PhantomData;
5
6use crate::error::CoreError;
7
8use crate::domain::database::DatabaseAdapter;
9use crate::domain::query::{IntoDbFilter, QueryBuilder};
10
11// FIXME simplify the import
12use crate::infrastructure::database::bindings::traits::{IdResult, SqliteInsert};
13
14pub struct SQLiteAdapter<T: Send + Sync> {
15    pool: Pool<Sqlite>,
16    table: String,
17    _phantom: PhantomData<T>,
18}
19
20impl<T: Send + Sync> SQLiteAdapter<T> {
21    pub fn new(pool: &Pool<Sqlite>, table: &str) -> Self {
22        SQLiteAdapter {
23            pool: pool.clone(),
24            table: table.into(),
25            _phantom: PhantomData,
26        }
27    }
28}
29
30#[async_trait]
31impl<T> DatabaseAdapter<T> for SQLiteAdapter<T>
32where
33    T: Send
34        + Sync
35        + Serialize
36        + DeserializeOwned
37        + 'static
38        + for<'r> FromRow<'r, sqlx::sqlite::SqliteRow>
39        + Unpin
40        + SqliteInsert,
41{
42    async fn insert(&self, data: T) -> Result<String, CoreError> {
43        let columns: String = T::columns()
44            .iter()
45            .map(|k| format!("\"{}\"", k))
46            .collect::<Vec<String>>()
47            .join(", ");
48        let values = T::columns()
49            .iter()
50            .enumerate()
51            .map(|(i, _)| format!("${}", i + 1))
52            .collect::<Vec<String>>()
53            .join(", ");
54
55        let sql = format!(
56            "INSERT INTO {} ({}) VALUES ({}) RETURNING id",
57            self.table, columns, values
58        );
59
60        let query = sqlx::query_as::<_, IdResult>(&sql);
61
62        let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
63        Ok(row.id)
64    }
65
66    async fn upsert(&self, data: T) -> Result<String, CoreError> {
67        let columns: String = T::columns()
68            .iter()
69            .map(|k| format!("\"{}\"", k))
70            .collect::<Vec<String>>()
71            .join(", ");
72        let values = T::columns()
73            .iter()
74            .enumerate()
75            .map(|(i, _)| format!("${}", i + 1))
76            .collect::<Vec<String>>()
77            .join(", ");
78        let conflict_vals = T::uniques().join(", ");
79
80        let sql = format!(
81            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET id = excluded.id RETURNING id",
82            self.table, columns, values, conflict_vals
83        );
84
85        let query = sqlx::query_as::<_, IdResult>(&sql);
86
87        let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
88        Ok(row.id)
89    }
90
91    async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
92        let columns: String = T::columns()
93            .iter()
94            .map(|k| format!("\"{}\"", k))
95            .collect::<Vec<String>>()
96            .join(", ");
97
98        let mut count = 0;
99        let mut rows: Vec<String> = Vec::new();
100        for _ in 0..data.len() {
101            let values = T::columns()
102                .iter()
103                .map(|_| {
104                    count += 1;
105                    format!("${}", count)
106                })
107                .collect::<Vec<String>>()
108                .join(", ");
109            rows.push(format!("({values})"));
110        }
111        let placeholders = rows.join(", ");
112
113        let sql = format!(
114            "INSERT INTO {} ({}) VALUES {} RETURNING id",
115            self.table, columns, placeholders
116        );
117
118        let query = sqlx::query_as::<_, IdResult>(&sql);
119        let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
120
121        let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
122        Ok(rows.into_iter().map(|r| r.id).collect())
123    }
124    async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
125        let columns: String = T::columns()
126            .iter()
127            .map(|k| format!("\"{}\"", k))
128            .collect::<Vec<String>>()
129            .join(", ");
130        let conflict_vals = T::uniques().join(", ");
131
132        let mut count = 0;
133        let mut rows: Vec<String> = Vec::new();
134        for _ in 0..data.len() {
135            let row = T::columns()
136                .iter()
137                .map(|_| {
138                    count += 1;
139                    format!("${}", count)
140                })
141                .collect::<Vec<String>>()
142                .join(", ");
143            rows.push(format!("({row})"));
144        }
145        let placeholders = rows.join(", ");
146
147        let sql = format!(
148            "INSERT INTO {} ({}) VALUES {} ON CONFLICT ({}) DO UPDATE SET id = excluded.id RETURNING id",
149            self.table, columns, placeholders, conflict_vals
150        );
151
152        let query = sqlx::query_as::<_, IdResult>(&sql);
153        let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
154
155        let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
156        Ok(rows.into_iter().map(|r| r.id).collect())
157    }
158
159    async fn find_all(&self, filter: QueryBuilder) -> Result<Vec<T>, CoreError> {
160        let (where_clause, values) = filter.into_sqlite_filter();
161
162        let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
163        let mut query = sqlx::query_as::<_, T>(&sql);
164        for v in values {
165            query = v.bind_sqlite(query);
166        }
167
168        query.fetch_all(&self.pool).await.map_err(Into::into)
169    }
170
171    async fn find_one(&self, query: QueryBuilder) -> Result<Option<T>, CoreError> {
172        let (where_clause, values) = query.into_sqlite_filter();
173
174        let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
175        let mut query = sqlx::query_as::<_, T>(&sql);
176        for v in values {
177            query = v.bind_sqlite(query);
178        }
179
180        query.fetch_optional(&self.pool).await.map_err(Into::into)
181    }
182
183    async fn find_one_and_update(
184        &self,
185        filter: QueryBuilder,
186        update: QueryBuilder,
187    ) -> Result<Option<T>, CoreError> {
188        let (where_clause, filter_values) = filter.into_sqlite_filter();
189        let (set_clause, update_values) = update.into_sqlite_update();
190
191        let sql = format!(
192            "UPDATE {} SET {} WHERE {} RETURNING *",
193            self.table, set_clause, where_clause
194        );
195        let mut query = sqlx::query_as::<_, T>(&sql);
196
197        for v in update_values.into_iter().chain(filter_values) {
198            query = v.bind_sqlite(query);
199        }
200
201        query.fetch_optional(&self.pool).await.map_err(Into::into)
202    }
203
204    async fn update_many(
205        &self,
206        filter: QueryBuilder,
207        update: QueryBuilder,
208    ) -> Result<(), CoreError> {
209        let (set_clause, update_values) = update.into_sqlite_update();
210        let (where_clause, filter_values) = filter.into_sqlite_filter();
211
212        let sql = format!(
213            "UPDATE {} SET {} WHERE {} RETURNING *",
214            self.table, set_clause, where_clause
215        );
216        let mut query = sqlx::query_as::<_, T>(&sql);
217
218        for v in update_values.into_iter().chain(filter_values) {
219            query = v.bind_sqlite(query);
220        }
221
222        query.fetch_optional(&self.pool).await?;
223        Ok(())
224    }
225
226    async fn delete_one(&self, filter: QueryBuilder) -> Result<(), CoreError> {
227        let (where_clause, values) = filter.into_sqlite_filter();
228
229        let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
230        let mut query = sqlx::query_as::<_, T>(&sql);
231        for v in values {
232            query = v.bind_sqlite(query);
233        }
234
235        query.fetch_optional(&self.pool).await?;
236        Ok(())
237    }
238    async fn delete_many(&self, filter: QueryBuilder) -> Result<(), CoreError> {
239        let (where_clause, values) = filter.into_sqlite_filter();
240
241        let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
242        let mut query = sqlx::query_as::<_, T>(&sql);
243        for v in values {
244            query = v.bind_sqlite(query);
245        }
246
247        query.fetch_optional(&self.pool).await?;
248        Ok(())
249    }
250}