1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::collections::HashMap;
use std::error::Error;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::path::Path;
use bb8_postgres::bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use bb8_postgres::tokio_postgres::{NoTls};
use tokio::fs;
use tracing::{error, info};
use dotenv::from_path_iter;

const CHECK: &str = "SELECT EXISTS(SELECT 1 FROM ext.\"__IAC\" WHERE \"MigrationId\" = $1)";
const SAVE: &str = "INSERT INTO ext.\"__IAC\" (\"MigrationId\", \"ProductVersion\") VALUES ($1, $2)";
const INITIALIZE: &str = "create table if not exists ext.\"__IAC\"
(
\"MigrationId\"    varchar(150) not null
        constraint \"PK___IAC\"
            primary key,
    \"ProductVersion\" varchar(32)  not null
);";
const GRANT_OWNER: &str = "alter table ext.\"__IAC\" owner to \"MIS_USER\";";

pub async fn prepare(pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
    let connection = pool.get().await?;
    connection.execute(INITIALIZE, &[]).await?;
    connection.execute(GRANT_OWNER, &[]).await?;
    Ok(())
}

pub async fn check_migration(migration_name: &String, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<bool, Box<dyn Error + Send + Sync>> {
    let connection = pool.get().await?;
    let row = connection.query_one(CHECK, &[migration_name]).await?;
    let exists: bool = row.get(0);
    Ok(exists)
}

pub async fn apply_migration(migration_name: String, file_names: Vec<&str>, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
    let mut connection = pool.get().await?;
    info!("Checking migration: {}", migration_name);
    let result = check_migration(&migration_name, pool).await?;
    if !result {
        info!("Applying migration: {}", &migration_name);
        let transaction = connection.transaction().await?;
        for file_name in file_names {
            let file_path = format!("/usr/local/bin/scripts/{}.sql", file_name);
            let query = fs::read_to_string(Path::new(&file_path)).await?;
            transaction.batch_execute(&query).await?;
        }
        transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
        transaction.commit().await?;
        info!("Applied migration: {}", &migration_name);
    }
    Ok(())
}

pub async fn apply_migrations_from_dir(dir: String, manager_pool: &Pool<PostgresConnectionManager<NoTls>>, target_pool: &Pool<PostgresConnectionManager<NoTls>>, env_path: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
    let mut manager_connection = manager_pool.get().await?;
    let mut target_connection = target_pool.get().await?;

    let mut paths: Vec<_>  = std::fs::read_dir(dir).unwrap().map(|res| res.unwrap()).collect();
    paths.sort_by_key(|entry| entry.file_name());
    for path in paths {
        let path = path.path();
        if path.extension().and_then(|ext| ext.to_str()) == Some("sql") {
            let migration_name = path.file_name().unwrap().to_str().unwrap().to_string();
            info!("Checking migration: {}", migration_name);
            let result = check_migration(&migration_name, manager_pool).await?;
            if !result {
                info!("Applying migration: {}", &migration_name);
                let target_transaction = target_connection.transaction().await?;
                let mut query = fs::read_to_string(Path::new(&path)).await?;
                replace_vars_in_string(&mut query, env_path);
                if let Err(e) = target_transaction.batch_execute(&query).await {
                    error!("Migration {} query error: {:#?}", &migration_name, e)
                }
                if let Err(e) = target_transaction.commit().await {
                    error!("Migration {} commit error: {:#?}", &migration_name, e)
                }

                let manager_transaction = manager_connection.transaction().await?;
                manager_transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
                manager_transaction.commit().await?;

                info!("Applied migration: {:#?}", &migration_name);
            }
        }
    }
    Ok(())
}

pub fn replace_vars_in_string(input: &mut String, env_path: &str) {
    let dotenv_vars = from_path_iter(env_path).expect("Failed to read .env file");
    let mut env_vars: HashMap<String, String> = HashMap::new();
    for item in dotenv_vars {
        let (key, value) = item.expect("Failed to parse .env file");
        env_vars.insert(key, value);
    }
    for (key, value) in &env_vars {
        let pattern = format!("[{}]", key);
        *input = input.replace(&pattern, value);
    }
}