1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use std::error::Error;
use std::path::Path;
use bb8_postgres::bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use bb8_postgres::tokio_postgres::{NoTls};
use tokio::fs;
use tracing::info;

const CHECK: &str = "SELECT EXISTS(SELECT 1 FROM ext.\"__IAC\" WHERE \"MigrationId\" = $1)";
const SAVE: &str = "INSERT INTO ext.\"__IAC\" (\"MigrationId\", \"ProductVersion\") VALUES ($1, $2)";

pub async fn check_migration(migration_name: &String, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<bool, Box<dyn Error + Send + Sync>> {
    let connection = pool.get().await?;
    let row = connection.query_one(CHECK, &[migration_name]).await?;
    let exists: bool = row.get(0);
    Ok(exists)
}

pub async fn apply_migration(migration_name: String, file_paths: Vec<&str>, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
    let mut connection = pool.get().await?;
    info!("Checking migration: {}", migration_name);
    let result = check_migration(&migration_name, pool).await?;
    if !result {
        info!("Applying migration: {}", &migration_name);
        let transaction = connection.transaction().await?;
        for file in file_paths {
            let file_path = format!("/usr/local/bin/scripts/{}.sql", file);
            let query = fs::read_to_string(Path::new(&file_path)).await?;
            transaction.batch_execute(&query).await?;
        }
        transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
        transaction.commit().await?;
        info!("Applied migration: {}", &migration_name);
    }
    Ok(())
}