shared/infrastructure/database/sqlite/
adapter.rs1use async_trait::async_trait;
2use serde::{Serialize, de::DeserializeOwned};
3use sqlx::{FromRow, Pool, Sqlite};
4use std::marker::PhantomData;
5
6use crate::error::CoreError;
7
8use crate::domain::database::DatabaseAdapter;
9use crate::domain::query::{IntoDbFilter, QueryBuilder};
10
11use crate::infrastructure::database::bindings::traits::{IdResult, SqliteInsert};
13
14pub struct SQLiteAdapter<T: Send + Sync> {
15 pool: Pool<Sqlite>,
16 table: String,
17 _phantom: PhantomData<T>,
18}
19
20impl<T: Send + Sync> SQLiteAdapter<T> {
21 pub fn new(pool: &Pool<Sqlite>, table: &str) -> Self {
22 SQLiteAdapter {
23 pool: pool.clone(),
24 table: table.into(),
25 _phantom: PhantomData,
26 }
27 }
28}
29
30#[async_trait]
31impl<T> DatabaseAdapter<T> for SQLiteAdapter<T>
32where
33 T: Send
34 + Sync
35 + Serialize
36 + DeserializeOwned
37 + 'static
38 + for<'r> FromRow<'r, sqlx::sqlite::SqliteRow>
39 + Unpin
40 + SqliteInsert,
41{
42 async fn insert(&self, data: T) -> Result<String, CoreError> {
43 let columns: String = T::columns()
44 .iter()
45 .map(|k| format!("\"{}\"", k))
46 .collect::<Vec<String>>()
47 .join(", ");
48 let values = T::columns()
49 .iter()
50 .enumerate()
51 .map(|(i, _)| format!("${}", i + 1))
52 .collect::<Vec<String>>()
53 .join(", ");
54
55 let sql = format!(
56 "INSERT INTO {} ({}) VALUES ({}) RETURNING id",
57 self.table, columns, values
58 );
59
60 let query = sqlx::query_as::<_, IdResult>(&sql);
61
62 let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
63 Ok(row.id)
64 }
65
66 async fn upsert(&self, data: T) -> Result<String, CoreError> {
67 let columns: String = T::columns()
68 .iter()
69 .map(|k| format!("\"{}\"", k))
70 .collect::<Vec<String>>()
71 .join(", ");
72 let values = T::columns()
73 .iter()
74 .enumerate()
75 .map(|(i, _)| format!("${}", i + 1))
76 .collect::<Vec<String>>()
77 .join(", ");
78 let conflict_vals = T::uniques().join(", ");
79
80 let sql = format!(
81 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET id = excluded.id RETURNING id",
82 self.table, columns, values, conflict_vals
83 );
84
85 let query = sqlx::query_as::<_, IdResult>(&sql);
86
87 let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
88 Ok(row.id)
89 }
90
91 async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
92 let columns: String = T::columns()
93 .iter()
94 .map(|k| format!("\"{}\"", k))
95 .collect::<Vec<String>>()
96 .join(", ");
97
98 let mut count = 0;
99 let mut rows: Vec<String> = Vec::new();
100 for _ in 0..data.len() {
101 let values = T::columns()
102 .iter()
103 .map(|_| {
104 count += 1;
105 format!("${}", count)
106 })
107 .collect::<Vec<String>>()
108 .join(", ");
109 rows.push(format!("({values})"));
110 }
111 let placeholders = rows.join(", ");
112
113 let sql = format!(
114 "INSERT INTO {} ({}) VALUES {} RETURNING id",
115 self.table, columns, placeholders
116 );
117
118 let query = sqlx::query_as::<_, IdResult>(&sql);
119 let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
120
121 let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
122 Ok(rows.into_iter().map(|r| r.id).collect())
123 }
124 async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
125 let columns: String = T::columns()
126 .iter()
127 .map(|k| format!("\"{}\"", k))
128 .collect::<Vec<String>>()
129 .join(", ");
130 let conflict_vals = T::uniques().join(", ");
131
132 let mut count = 0;
133 let mut rows: Vec<String> = Vec::new();
134 for _ in 0..data.len() {
135 let row = T::columns()
136 .iter()
137 .map(|_| {
138 count += 1;
139 format!("${}", count)
140 })
141 .collect::<Vec<String>>()
142 .join(", ");
143 rows.push(format!("({row})"));
144 }
145 let placeholders = rows.join(", ");
146
147 let sql = format!(
148 "INSERT INTO {} ({}) VALUES {} ON CONFLICT ({}) DO UPDATE SET id = excluded.id RETURNING id",
149 self.table, columns, placeholders, conflict_vals
150 );
151
152 let query = sqlx::query_as::<_, IdResult>(&sql);
153 let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
154
155 let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
156 Ok(rows.into_iter().map(|r| r.id).collect())
157 }
158
159 async fn find_all(&self, filter: QueryBuilder) -> Result<Vec<T>, CoreError> {
160 let (where_clause, values) = filter.into_sqlite_filter();
161
162 let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
163 let mut query = sqlx::query_as::<_, T>(&sql);
164 for v in values {
165 query = v.bind_sqlite(query);
166 }
167
168 query.fetch_all(&self.pool).await.map_err(Into::into)
169 }
170
171 async fn find_one(&self, query: QueryBuilder) -> Result<Option<T>, CoreError> {
172 let (where_clause, values) = query.into_sqlite_filter();
173
174 let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
175 let mut query = sqlx::query_as::<_, T>(&sql);
176 for v in values {
177 query = v.bind_sqlite(query);
178 }
179
180 query.fetch_optional(&self.pool).await.map_err(Into::into)
181 }
182
183 async fn find_one_and_update(
184 &self,
185 filter: QueryBuilder,
186 update: QueryBuilder,
187 ) -> Result<Option<T>, CoreError> {
188 let (where_clause, filter_values) = filter.into_sqlite_filter();
189 let (set_clause, update_values) = update.into_sqlite_update();
190
191 let sql = format!(
192 "UPDATE {} SET {} WHERE {} RETURNING *",
193 self.table, set_clause, where_clause
194 );
195 let mut query = sqlx::query_as::<_, T>(&sql);
196
197 for v in update_values.into_iter().chain(filter_values) {
198 query = v.bind_sqlite(query);
199 }
200
201 query.fetch_optional(&self.pool).await.map_err(Into::into)
202 }
203
204 async fn update_many(
205 &self,
206 filter: QueryBuilder,
207 update: QueryBuilder,
208 ) -> Result<(), CoreError> {
209 let (set_clause, update_values) = update.into_sqlite_update();
210 let (where_clause, filter_values) = filter.into_sqlite_filter();
211
212 let sql = format!(
213 "UPDATE {} SET {} WHERE {} RETURNING *",
214 self.table, set_clause, where_clause
215 );
216 let mut query = sqlx::query_as::<_, T>(&sql);
217
218 for v in update_values.into_iter().chain(filter_values) {
219 query = v.bind_sqlite(query);
220 }
221
222 query.fetch_optional(&self.pool).await?;
223 Ok(())
224 }
225
226 async fn delete_one(&self, filter: QueryBuilder) -> Result<(), CoreError> {
227 let (where_clause, values) = filter.into_sqlite_filter();
228
229 let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
230 let mut query = sqlx::query_as::<_, T>(&sql);
231 for v in values {
232 query = v.bind_sqlite(query);
233 }
234
235 query.fetch_optional(&self.pool).await?;
236 Ok(())
237 }
238 async fn delete_many(&self, filter: QueryBuilder) -> Result<(), CoreError> {
239 let (where_clause, values) = filter.into_sqlite_filter();
240
241 let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
242 let mut query = sqlx::query_as::<_, T>(&sql);
243 for v in values {
244 query = v.bind_sqlite(query);
245 }
246
247 query.fetch_optional(&self.pool).await?;
248 Ok(())
249 }
250}