1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
mod m000001;
mod m000002;
mod m000003;
mod m000004;
mod m000005;
mod m000006;
mod m000007;
mod m000008;
mod m000009;
mod m000010;
mod m000011;
mod m000012;
mod m000013;
mod m000014;
mod m000015;
mod m000016;
mod m000017;
mod m000018;

use sqlx::{query, Acquire, Error as SqlxError, PgExecutor, Postgres, Row};
use tracing::info;

use m000001::M000001_MIGRATION;
use m000002::M000002_MIGRATION;
use m000003::M000003_MIGRATION;
use m000004::M000004_MIGRATION;
use m000005::M000005_MIGRATION;
use m000006::M000006_MIGRATION;
use m000007::M000007_MIGRATION;
use m000008::M000008_MIGRATION;
use m000009::M000009_MIGRATION;
use m000010::M000010_MIGRATION;
use m000011::M000011_MIGRATION;
use m000012::M000012_MIGRATION;
use m000013::M000013_MIGRATION;
use m000014::M000014_MIGRATION;
use m000015::M000015_MIGRATION;
use m000016::M000016_MIGRATION;
use m000017::M000017_MIGRATION;
use m000018::M000018_MIGRATION;

pub const MIGRATIONS: &[&[&str]] = &[
    M000001_MIGRATION,
    M000002_MIGRATION,
    M000003_MIGRATION,
    M000004_MIGRATION,
    M000005_MIGRATION,
    M000006_MIGRATION,
    M000007_MIGRATION,
    M000008_MIGRATION,
    M000009_MIGRATION,
    M000010_MIGRATION,
    M000011_MIGRATION,
    M000012_MIGRATION,
    M000013_MIGRATION,
    M000014_MIGRATION,
    M000015_MIGRATION,
    M000016_MIGRATION,
    M000017_MIGRATION,
    M000018_MIGRATION,
];

async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<(), sqlx::Error>
where
    E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Clone,
{
    let create_schema_query = format!(
        r#"
            create schema {escaped_schema};
        "#
    );

    let create_migration_table_query = format!(
        r#"
            create table {escaped_schema}.migrations (
                id int primary key, 
                ts timestamptz default now() not null
            );
        "#
    );

    let mut tx = executor.begin().await?;
    query(&create_schema_query).execute(tx.as_mut()).await?;
    query(&create_migration_table_query)
        .execute(tx.as_mut())
        .await?;
    tx.commit().await?;

    Ok(())
}

pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), sqlx::Error>
where
    E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone,
{
    let migrations_status_query =
        format!("select id from {escaped_schema}.migrations order by id desc limit 1");
    let last_migration_query_result = query(&migrations_status_query)
        .fetch_optional(executor.clone())
        .await;

    let last_migration = match last_migration_query_result {
        Err(SqlxError::Database(e)) => {
            let Some(code) = e.code() else {
                return Err(SqlxError::Database(e));
            };

            if code == "42P01" {
                install_schema(executor.clone(), escaped_schema).await?;
            } else {
                return Err(SqlxError::Database(e));
            }

            None
        }
        Err(e) => {
            return Err(e);
        }
        Ok(optional_row) => optional_row.map(|row| row.get("id")),
    };

    for (i, migration_statements) in MIGRATIONS.iter().enumerate() {
        let migration_number = (i + 1) as i32;

        if last_migration.is_none() || migration_number > last_migration.unwrap() {
            info!(migration_number, "Executing migration");
            let mut tx = executor.clone().begin().await?;

            for migration_statement in migration_statements.iter() {
                let sql = migration_statement.replace(":ARCHIMEDES_SCHEMA", escaped_schema);
                query(sql.as_str()).execute(tx.as_mut()).await?;
            }

            let sql = format!("insert into {escaped_schema}.migrations (id) values ($1)");
            query(&sql)
                .bind(migration_number)
                .execute(tx.as_mut())
                .await?;

            tx.commit().await?;
        }
    }

    Ok(())
}