1use std::error::Error;
2
3use sqlx::{Database, Executor, Pool};
4
5#[derive(Debug, Clone)]
6pub struct Migration {
7 pub version: String,
8 pub name: String,
9 pub up_sql: String,
10 pub down_sql: String,
11}
12
13#[derive(Debug, Clone, sqlx::FromRow)]
14struct AppliedMigration {
15 version: String,
16}
17
18pub struct Migrator<DB: Database> {
19 pool: Pool<DB>,
20}
21
22impl<DB: Database> Migrator<DB> {
23 pub fn new(pool: Pool<DB>) -> Self {
24 Self { pool }
25 }
26}
27
28#[cfg(feature = "sqlite")]
33impl Migrator<sqlx::Sqlite> {
34 pub async fn run(&self, migrations: Vec<Migration>) -> Result<(), Box<dyn Error>> {
35 let mut tx = self.pool.begin().await?;
36
37 sqlx::query(
39 "CREATE TABLE IF NOT EXISTS _premix_migrations (
40 version TEXT PRIMARY KEY,
41 name TEXT NOT NULL,
42 applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
43 )",
44 )
45 .execute(&mut *tx)
46 .await?;
47
48 let applied_versions: Vec<String> = sqlx::query_as::<_, AppliedMigration>(
50 "SELECT version FROM _premix_migrations ORDER BY version ASC",
51 )
52 .fetch_all(&mut *tx)
53 .await?
54 .into_iter()
55 .map(|m| m.version)
56 .collect();
57
58 for migration in migrations {
60 if !applied_versions.contains(&migration.version) {
61 println!(
62 "🚚 Applying migration: {} - {}",
63 migration.version, migration.name
64 );
65
66 tx.execute(migration.up_sql.as_str()).await?;
68
69 sqlx::query("INSERT INTO _premix_migrations (version, name) VALUES (?, ?)")
71 .bind(&migration.version)
72 .bind(&migration.name)
73 .execute(&mut *tx)
74 .await?;
75 }
76 }
77
78 tx.commit().await?;
79 Ok(())
80 }
81}
82
83#[cfg(feature = "postgres")]
84impl Migrator<sqlx::Postgres> {
85 pub async fn run(&self, migrations: Vec<Migration>) -> Result<(), Box<dyn Error>> {
86 let mut tx = self.pool.begin().await?;
87
88 sqlx::query(
90 "CREATE TABLE IF NOT EXISTS _premix_migrations (
91 version TEXT PRIMARY KEY,
92 name TEXT NOT NULL,
93 applied_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
94 )",
95 )
96 .execute(&mut *tx)
97 .await?;
98
99 let applied_versions: Vec<String> = sqlx::query_as::<_, AppliedMigration>(
101 "SELECT version FROM _premix_migrations ORDER BY version ASC",
102 )
103 .fetch_all(&mut *tx)
104 .await?
105 .into_iter()
106 .map(|m| m.version)
107 .collect();
108
109 for migration in migrations {
111 if !applied_versions.contains(&migration.version) {
112 println!(
113 "🚚 Applying migration: {} - {}",
114 migration.version, migration.name
115 );
116
117 tx.execute(migration.up_sql.as_str()).await?;
125
126 sqlx::query("INSERT INTO _premix_migrations (version, name) VALUES ($1, $2)")
128 .bind(&migration.version)
129 .bind(&migration.name)
130 .execute(&mut *tx)
131 .await?;
132 }
133 }
134
135 tx.commit().await?;
136 Ok(())
137 }
138}