Skip to main content

schema_installer/
connection.rs

1use crate::error::SchemaInstallerError;
2use crate::migration::AppliedMigration;
3use crate::tracking::SchemaMigrationDdl;
4use schema_sql_generator::common::generator_type::GeneratorType;
5use sqlx::{postgres::PgPoolOptions, sqlite::SqlitePoolOptions, AnyPool as SqlxAnyPool, Pool, Postgres, Row, Sqlite};
6
7pub enum AnyPool {
8    Postgresql(Pool<Postgres>),
9    Sqlite(Pool<Sqlite>),
10    Any(SqlxAnyPool),
11}
12
13impl AnyPool {
14    pub async fn connect(database_type: &GeneratorType, connection_string: &str) -> Result<Self, SchemaInstallerError> {
15        match database_type {
16            GeneratorType::Postgresql => {
17                let pool = PgPoolOptions::new()
18                    .max_connections(5)
19                    .connect(connection_string)
20                    .await
21                    .map_err(|e| SchemaInstallerError::Connection(e.to_string()))?;
22                Ok(AnyPool::Postgresql(pool))
23            }
24            GeneratorType::Sqlite => {
25                let pool = SqlitePoolOptions::new()
26                    .max_connections(5)
27                    .connect(connection_string)
28                    .await
29                    .map_err(|e| SchemaInstallerError::Connection(e.to_string()))?;
30                Ok(AnyPool::Sqlite(pool))
31            }
32            GeneratorType::SqlServer => {
33                let pool = sqlx::any::AnyPoolOptions::new()
34                    .max_connections(5)
35                    .connect(connection_string)
36                    .await
37                    .map_err(|e| SchemaInstallerError::Connection(e.to_string()))?;
38                Ok(AnyPool::Any(pool))
39            }
40        }
41    }
42
43    pub async fn execute_sql(&self, sql: &str) -> Result<(), SchemaInstallerError> {
44        match self {
45            AnyPool::Postgresql(pool) => {
46                sqlx::query(sql)
47                    .execute(pool)
48                    .await
49                    .map_err(|e| SchemaInstallerError::Execution(e.to_string()))?;
50                Ok(())
51            }
52            AnyPool::Sqlite(pool) => {
53                sqlx::query(sql)
54                    .execute(pool)
55                    .await
56                    .map_err(|e| SchemaInstallerError::Execution(e.to_string()))?;
57                Ok(())
58            }
59            AnyPool::Any(pool) => {
60                sqlx::query(sql)
61                    .execute(pool)
62                    .await
63                    .map_err(|e| SchemaInstallerError::Execution(e.to_string()))?;
64                Ok(())
65            }
66        }
67    }
68
69    pub async fn ensure_migration_table(&self, database_type: &GeneratorType) -> Result<(), SchemaInstallerError> {
70        let ddl = SchemaMigrationDdl::schema_migration_ddl(database_type);
71        self.execute_sql(&ddl).await
72    }
73
74    pub async fn get_applied_migrations(&self) -> Result<Vec<AppliedMigration>, SchemaInstallerError> {
75        match self {
76            AnyPool::Postgresql(pool) => {
77                let rows: Vec<(i64, String, String, String, i32, String, String, String)> =
78                    sqlx::query_as(
79                        "SELECT id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version FROM schema_migration ORDER BY installed_at"
80                    )
81                    .fetch_all(pool)
82                    .await
83                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
84
85                Ok(rows
86                    .into_iter()
87                    .map(
88                        |(id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version)| {
89                            AppliedMigration {
90                                id,
91                                version,
92                                script_path,
93                                checksum,
94                                execution_time_ms: execution_time_ms as i64,
95                                installed_at,
96                                status,
97                                tool_version,
98                            }
99                        },
100                    )
101                    .collect())
102            }
103            AnyPool::Sqlite(pool) => {
104                let rows: Vec<(i64, String, String, String, i64, String, String, String)> =
105                    sqlx::query_as(
106                        "SELECT id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version FROM schema_migration ORDER BY installed_at"
107                    )
108                    .fetch_all(pool)
109                    .await
110                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
111
112                Ok(rows
113                    .into_iter()
114                    .map(
115                        |(id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version)| {
116                            AppliedMigration {
117                                id,
118                                version,
119                                script_path,
120                                checksum,
121                                execution_time_ms,
122                                installed_at,
123                                status,
124                                tool_version,
125                            }
126                        },
127                    )
128                    .collect())
129            }
130            AnyPool::Any(pool) => {
131                let rows = sqlx::query(
132                    "SELECT id, version, script_path, checksum, execution_time_ms, installed_at, status, tool_version FROM schema_migration ORDER BY installed_at"
133                )
134                .fetch_all(pool)
135                .await
136                .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
137
138                let migrations = rows
139                    .into_iter()
140                    .map(|row| {
141                        let id: i64 = row.try_get(0).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
142                        let version: String = row.try_get(1).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
143                        let script_path: String = row.try_get(2).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
144                        let checksum: String = row.try_get(3).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
145                        let execution_time_ms: i64 = row.try_get(4).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
146                        let installed_at: String = row.try_get(5).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
147                        let status: String = row.try_get(6).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
148                        let tool_version: String = row.try_get(7).map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
149
150                        Ok(AppliedMigration {
151                            id,
152                            version,
153                            script_path,
154                            checksum,
155                            execution_time_ms,
156                            installed_at,
157                            status,
158                            tool_version,
159                        })
160                    })
161                    .collect::<Result<Vec<_>, SchemaInstallerError>>()?;
162
163                Ok(migrations)
164            }
165        }
166    }
167
168    pub async fn insert_migration(
169        &self,
170        version: &str,
171        script_path: &str,
172        checksum: &str,
173        execution_time_ms: i64,
174        status: &str,
175        tool_version: &str,
176    ) -> Result<i64, SchemaInstallerError> {
177        match self {
178            AnyPool::Postgresql(pool) => {
179                let row: (i64,) = sqlx::query_as(
180                    "INSERT INTO schema_migration (version, script_path, checksum, execution_time_ms, status, tool_version) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
181                )
182                .bind(version)
183                .bind(script_path)
184                .bind(checksum)
185                .bind(execution_time_ms)
186                .bind(status)
187                .bind(tool_version)
188                .fetch_one(pool)
189                .await
190                .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
191                Ok(row.0)
192            }
193            AnyPool::Sqlite(pool) => {
194                sqlx::query(
195                    "INSERT INTO schema_migration (version, script_path, checksum, execution_time_ms, status, tool_version) VALUES (?, ?, ?, ?, ?, ?)"
196                )
197                .bind(version)
198                .bind(script_path)
199                .bind(checksum)
200                .bind(execution_time_ms)
201                .bind(status)
202                .bind(tool_version)
203                .execute(pool)
204                .await
205                .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
206
207                let id: (i64,) = sqlx::query_as("SELECT id FROM schema_migration WHERE version = ? ORDER BY id DESC LIMIT 1")
208                    .bind(version)
209                    .fetch_one(pool)
210                    .await
211                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
212                Ok(id.0)
213            }
214            AnyPool::Any(pool) => {
215                sqlx::query(
216                    "INSERT INTO schema_migration (version, script_path, checksum, execution_time_ms, status, tool_version) VALUES (?, ?, ?, ?, ?, ?)"
217                )
218                .bind(version)
219                .bind(script_path)
220                .bind(checksum)
221                .bind(execution_time_ms)
222                .bind(status)
223                .bind(tool_version)
224                .execute(pool)
225                .await
226                .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
227
228                let id: (i64,) = sqlx::query_as("SELECT id FROM schema_migration WHERE version = ? ORDER BY id DESC LIMIT 1")
229                    .bind(version)
230                    .fetch_one(pool)
231                    .await
232                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
233                Ok(id.0)
234            }
235        }
236    }
237
238    pub async fn update_migration_status(
239        &self,
240        id: i64,
241        status: &str,
242        execution_time_ms: i64,
243    ) -> Result<(), SchemaInstallerError> {
244        match self {
245            AnyPool::Postgresql(pool) => {
246                sqlx::query("UPDATE schema_migration SET status = $1, execution_time_ms = $2 WHERE id = $3")
247                    .bind(status)
248                    .bind(execution_time_ms)
249                    .bind(id)
250                    .execute(pool)
251                    .await
252                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
253                Ok(())
254            }
255            AnyPool::Sqlite(pool) => {
256                sqlx::query("UPDATE schema_migration SET status = ?, execution_time_ms = ? WHERE id = ?")
257                    .bind(status)
258                    .bind(execution_time_ms)
259                    .bind(id)
260                    .execute(pool)
261                    .await
262                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
263                Ok(())
264            }
265            AnyPool::Any(pool) => {
266                sqlx::query("UPDATE schema_migration SET status = ?, execution_time_ms = ? WHERE id = ?")
267                    .bind(status)
268                    .bind(execution_time_ms)
269                    .bind(id)
270                    .execute(pool)
271                    .await
272                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
273                Ok(())
274            }
275        }
276    }
277
278    pub async fn delete_failed_migrations(&self) -> Result<(), SchemaInstallerError> {
279        match self {
280            AnyPool::Postgresql(pool) => {
281                sqlx::query("DELETE FROM schema_migration WHERE status = $1")
282                    .bind("failed")
283                    .execute(pool)
284                    .await
285                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
286                Ok(())
287            }
288            AnyPool::Sqlite(pool) => {
289                sqlx::query("DELETE FROM schema_migration WHERE status = ?")
290                    .bind("failed")
291                    .execute(pool)
292                    .await
293                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
294                Ok(())
295            }
296            AnyPool::Any(pool) => {
297                sqlx::query("DELETE FROM schema_migration WHERE status = ?")
298                    .bind("failed")
299                    .execute(pool)
300                    .await
301                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
302                Ok(())
303            }
304        }
305    }
306
307    pub async fn update_migration_checksum(
308        &self,
309        id: i64,
310        checksum: &str,
311    ) -> Result<(), SchemaInstallerError> {
312        match self {
313            AnyPool::Postgresql(pool) => {
314                sqlx::query("UPDATE schema_migration SET checksum = $1 WHERE id = $2")
315                    .bind(checksum)
316                    .bind(id)
317                    .execute(pool)
318                    .await
319                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
320                Ok(())
321            }
322            AnyPool::Sqlite(pool) => {
323                sqlx::query("UPDATE schema_migration SET checksum = ? WHERE id = ?")
324                    .bind(checksum)
325                    .bind(id)
326                    .execute(pool)
327                    .await
328                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
329                Ok(())
330            }
331            AnyPool::Any(pool) => {
332                sqlx::query("UPDATE schema_migration SET checksum = ? WHERE id = ?")
333                    .bind(checksum)
334                    .bind(id)
335                    .execute(pool)
336                    .await
337                    .map_err(|e| SchemaInstallerError::Database(e.to_string()))?;
338                Ok(())
339            }
340        }
341    }
342}