1use crate::error::SchemaInstallerError;
2use crate::migration::AppliedMigration;
3use crate::tracking::SchemaMigrationDdl;
4use schema_sql_generator::common::generator_type::GeneratorType;
5use sqlx::{postgres::PgPoolOptions, sqlite::SqlitePoolOptions, AnyPool as SqlxAnyPool, Pool, Postgres, Row, Sqlite};
6
7pub enum AnyPool {
8 Postgresql(Pool<Postgres>),
9 Sqlite(Pool<Sqlite>),
10 Any(SqlxAnyPool),
11}
12
13impl AnyPool {
14 pub async fn connect(database_type: &GeneratorType, connection_string: &str) -> Result<Self, SchemaInstallerError> {
15 match database_type {
16 GeneratorType::Postgresql => {
17 let pool = PgPoolOptions::new()
18 .max_connections(5)
19 .connect(connection_string)
20 .await
21 .map_err(|e| SchemaInstallerError::Connection(e.to_string()))?;
22 Ok(AnyPool::Postgresql(pool))
23 }
24 GeneratorType::Sqlite => {
25 let pool = SqlitePoolOptions::new()
26 .max_connections(5)
27 .connect(connection_string)
28 .await
29 .map_err(|e| SchemaInstallerError::Connection(e.to_string()))?;
30 Ok(AnyPool::Sqlite(pool))
31 }
32 GeneratorType::SqlServer => {
33 let pool = sqlx::any::AnyPoolOptions::new()
34 .max_connections(5)
35 .connect(connection_string)
36 .await
37 .map_err(|e| SchemaInstallerError::Connection(e.to_string()))?;
38 Ok(AnyPool::Any(pool))
39 }
40 }
41 }
42
43 pub async fn execute_sql(&self, sql: &str) -> Result<(), SchemaInstallerError> {
44 match self {
45 AnyPool::Postgresql(pool) => {
46 sqlx::query(sql)
47 .execute(pool)
48 .await
49 .map_err(|e| SchemaInstallerError::Execution(e.to_string()))?;
50 Ok(())
51 }
52 AnyPool::Sqlite(pool) => {
53 sqlx::query(sql)
54 .execute(pool)
55 .await
56 .map_err(|e| SchemaInstallerError::Execution(e.to_string()))?;
57 Ok(())
58 }
59 AnyPool::Any(pool) => {
60 sqlx::query(sql)
61 .execute(pool)
62 .await
63 .map_err(|e| SchemaInstallerError::Execution(e.to_string()))?;
64 Ok(())
65 }
66 }
67 }
68
69 pub async fn ensure_migration_table(&self, database_type: &GeneratorType) -> Result<(), SchemaInstallerError> {
70 let ddl = SchemaMigrationDdl::schema_migration_ddl(database_type);
71 self.execute_sql(&ddl).await
72 }
73
74 pub async fn get_applied_migrations(&self) -> Result<Vec<AppliedMigration>, SchemaInstallerError> {
75 match self {
76 AnyPool::Postgresql(pool) => {
77 let rows: Vec<(i64, String, String, String, i32, String, String, String)> =
78 sqlx::query_as(
79 "SELECT id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version FROM schema_migration ORDER BY installed_at"
80 )
81 .fetch_all(pool)
82 .await
83 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
84
85 Ok(rows
86 .into_iter()
87 .map(
88 |(id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version)| {
89 AppliedMigration {
90 id,
91 version,
92 script_path,
93 checksum,
94 execution_time_ms: execution_time_ms as i64,
95 installed_at,
96 status,
97 tool_version,
98 }
99 },
100 )
101 .collect())
102 }
103 AnyPool::Sqlite(pool) => {
104 let rows: Vec<(i64, String, String, String, i64, String, String, String)> =
105 sqlx::query_as(
106 "SELECT id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version FROM schema_migration ORDER BY installed_at"
107 )
108 .fetch_all(pool)
109 .await
110 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
111
112 Ok(rows
113 .into_iter()
114 .map(
115 |(id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version)| {
116 AppliedMigration {
117 id,
118 version,
119 script_path,
120 checksum,
121 execution_time_ms,
122 installed_at,
123 status,
124 tool_version,
125 }
126 },
127 )
128 .collect())
129 }
130 AnyPool::Any(pool) => {
131 let rows = sqlx::query(
132 "SELECT id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version FROM schema_migration ORDER BY installed_at"
133 )
134 .fetch_all(pool)
135 .await
136 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
137
138 let migrations = rows
139 .into_iter()
140 .map(|row| {
141 let id: i64 = row.try_get(0).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
142 let version: String = row.try_get(1).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
143 let script_path: String = row.try_get(2).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
144 let checksum: String = row.try_get(3).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
145 let execution_time_ms: i64 = row.try_get(4).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
146 let installed_at: String = row.try_get(5).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
147 let status: String = row.try_get(6).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
148 let tool_version: String = row.try_get(7).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
149
150 Ok(AppliedMigration {
151 id,
152 version,
153 script_path,
154 checksum,
155 execution_time_ms,
156 installed_at,
157 status,
158 tool_version,
159 })
160 })
161 .collect::<Result<Vec<_>, SchemaInstallerError>>()?;
162
163 Ok(migrations)
164 }
165 }
166 }
167
168 pub async fn insert_migration(
169 &self,
170 version: &str,
171 script_path: &str,
172 checksum: &str,
173 execution_time_ms: i64,
174 status: &str,
175 tool_version: &str,
176 ) -> Result<i64, SchemaInstallerError> {
177 match self {
178 AnyPool::Postgresql(pool) => {
179 let row: (i64,) = sqlx::query_as(
180 "INSERT INTO schema_migration (version, script_path, checksum, execution_time_ms, status, tool_version) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
181 )
182 .bind(version)
183 .bind(script_path)
184 .bind(checksum)
185 .bind(execution_time_ms)
186 .bind(status)
187 .bind(tool_version)
188 .fetch_one(pool)
189 .await
190 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
191 Ok(row.0)
192 }
193 AnyPool::Sqlite(pool) => {
194 sqlx::query(
195 "INSERT INTO schema_migration (version, script_path, checksum, execution_time_ms, status, tool_version) VALUES (?, ?, ?, ?, ?, ?)"
196 )
197 .bind(version)
198 .bind(script_path)
199 .bind(checksum)
200 .bind(execution_time_ms)
201 .bind(status)
202 .bind(tool_version)
203 .execute(pool)
204 .await
205 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
206
207 let id: (i64,) = sqlx::query_as("SELECT id FROM schema_migration WHERE version = ? ORDER BY id DESC LIMIT 1")
208 .bind(version)
209 .fetch_one(pool)
210 .await
211 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
212 Ok(id.0)
213 }
214 AnyPool::Any(pool) => {
215 sqlx::query(
216 "INSERT INTO schema_migration (version, script_path, checksum, execution_time_ms, status, tool_version) VALUES (?, ?, ?, ?, ?, ?)"
217 )
218 .bind(version)
219 .bind(script_path)
220 .bind(checksum)
221 .bind(execution_time_ms)
222 .bind(status)
223 .bind(tool_version)
224 .execute(pool)
225 .await
226 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
227
228 let id: (i64,) = sqlx::query_as("SELECT id FROM schema_migration WHERE version = ? ORDER BY id DESC LIMIT 1")
229 .bind(version)
230 .fetch_one(pool)
231 .await
232 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
233 Ok(id.0)
234 }
235 }
236 }
237
238 pub async fn update_migration_status(
239 &self,
240 id: i64,
241 status: &str,
242 execution_time_ms: i64,
243 ) -> Result<(), SchemaInstallerError> {
244 match self {
245 AnyPool::Postgresql(pool) => {
246 sqlx::query("UPDATE schema_migration SET status = $1, execution_time_ms = $2 WHERE id = $3")
247 .bind(status)
248 .bind(execution_time_ms)
249 .bind(id)
250 .execute(pool)
251 .await
252 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
253 Ok(())
254 }
255 AnyPool::Sqlite(pool) => {
256 sqlx::query("UPDATE schema_migration SET status = ?, execution_time_ms = ? WHERE id = ?")
257 .bind(status)
258 .bind(execution_time_ms)
259 .bind(id)
260 .execute(pool)
261 .await
262 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
263 Ok(())
264 }
265 AnyPool::Any(pool) => {
266 sqlx::query("UPDATE schema_migration SET status = ?, execution_time_ms = ? WHERE id = ?")
267 .bind(status)
268 .bind(execution_time_ms)
269 .bind(id)
270 .execute(pool)
271 .await
272 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
273 Ok(())
274 }
275 }
276 }
277
278 pub async fn delete_failed_migrations(&self) -> Result<(), SchemaInstallerError> {
279 match self {
280 AnyPool::Postgresql(pool) => {
281 sqlx::query("DELETE FROM schema_migration WHERE status = $1")
282 .bind("failed")
283 .execute(pool)
284 .await
285 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
286 Ok(())
287 }
288 AnyPool::Sqlite(pool) => {
289 sqlx::query("DELETE FROM schema_migration WHERE status = ?")
290 .bind("failed")
291 .execute(pool)
292 .await
293 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
294 Ok(())
295 }
296 AnyPool::Any(pool) => {
297 sqlx::query("DELETE FROM schema_migration WHERE status = ?")
298 .bind("failed")
299 .execute(pool)
300 .await
301 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
302 Ok(())
303 }
304 }
305 }
306
307 pub async fn update_migration_checksum(
308 &self,
309 id: i64,
310 checksum: &str,
311 ) -> Result<(), SchemaInstallerError> {
312 match self {
313 AnyPool::Postgresql(pool) => {
314 sqlx::query("UPDATE schema_migration SET checksum = $1 WHERE id = $2")
315 .bind(checksum)
316 .bind(id)
317 .execute(pool)
318 .await
319 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
320 Ok(())
321 }
322 AnyPool::Sqlite(pool) => {
323 sqlx::query("UPDATE schema_migration SET checksum = ? WHERE id = ?")
324 .bind(checksum)
325 .bind(id)
326 .execute(pool)
327 .await
328 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
329 Ok(())
330 }
331 AnyPool::Any(pool) => {
332 sqlx::query("UPDATE schema_migration SET checksum = ? WHERE id = ?")
333 .bind(checksum)
334 .bind(id)
335 .execute(pool)
336 .await
337 .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
338 Ok(())
339 }
340 }
341 }
342}