migrations_rs/
lib.rs

1use std::error::Error;
2use std::path::Path;
3use bb8_postgres::bb8::Pool;
4use bb8_postgres::PostgresConnectionManager;
5use bb8_postgres::tokio_postgres::{NoTls};
6use tokio::fs;
7use tracing::{error, info};
8use dotenv::from_path;
9
10const CHECK: &str = "SELECT EXISTS(SELECT 1 FROM ext.\"__IAC\" WHERE \"MigrationId\" = $1)";
11const SAVE: &str = "INSERT INTO ext.\"__IAC\" (\"MigrationId\", \"ProductVersion\") VALUES ($1, $2)";
12const INITIALIZE: &str = "create table if not exists ext.\"__IAC\"
13(
14\"MigrationId\"    varchar(150) not null
15        constraint \"PK___IAC\"
16            primary key,
17    \"ProductVersion\" varchar(32)  not null
18);";
19const GRANT_OWNER: &str = "alter table ext.\"__IAC\" owner to \"MIS_USER\";";
20
21const CREATE_SCHEMA: &str = "CREATE SCHEMA IF NOT EXISTS ext;";
22const SET_SCHEMA_OWNER: &str = "ALTER SCHEMA ext OWNER TO \"MIS_USER\";";
23
24const CHECK_EXTENSION: &str = "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'uuid-ossp');";
25const CREATE_EXTENSION: &str = "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";";
26
27pub async fn prepare(pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
28    let connection = pool.get().await?;
29    connection.execute(CREATE_SCHEMA, &[]).await?;
30    connection.execute(SET_SCHEMA_OWNER, &[]).await?;
31    let extension_exists: bool = connection.query_one(CHECK_EXTENSION, &[]).await?.get(0);
32    if !extension_exists {
33        // Create the extension if it doesn't exist
34        connection.execute(CREATE_EXTENSION, &[]).await?;
35    }
36    connection.execute(INITIALIZE, &[]).await?;
37    connection.execute(GRANT_OWNER, &[]).await?;
38    Ok(())
39}
40
41pub async fn check_migration(migration_name: &String, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<bool, Box<dyn Error + Send + Sync>> {
42    let connection = pool.get().await?;
43    let row = connection.query_one(CHECK, &[migration_name]).await?;
44    let exists: bool = row.get(0);
45    Ok(exists)
46}
47
48pub async fn apply_migration(migration_name: String, file_names: Vec<&str>, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
49    let mut connection = pool.get().await?;
50    info!("Checking migration: {}", migration_name);
51    let result = check_migration(&migration_name, pool).await?;
52    if !result {
53        info!("Applying migration: {}", &migration_name);
54        let transaction = connection.transaction().await?;
55        for file_name in file_names {
56            let file_path = format!("/usr/local/bin/scripts/{}.sql", file_name);
57            let query = fs::read_to_string(Path::new(&file_path)).await?;
58            transaction.batch_execute(&query).await?;
59        }
60        transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
61        transaction.commit().await?;
62        info!("Applied migration: {}", &migration_name);
63    }
64    Ok(())
65}
66
67pub async fn apply_migrations_from_dir(dir: String, manager_pool: &Pool<PostgresConnectionManager<NoTls>>, target_pool: &Pool<PostgresConnectionManager<NoTls>>, env_path: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
68    let mut manager_connection = manager_pool.get().await?;
69    let mut target_connection = target_pool.get().await?;
70
71    let mut paths: Vec<_>  = std::fs::read_dir(dir).unwrap().map(|res| res.unwrap()).collect();
72    paths.sort_by_key(|entry| {
73        entry.file_name().to_string_lossy().split_once('-')
74            .map(|(prefix, _)| prefix.parse::<i32>().unwrap_or(i32::MAX))
75            .unwrap_or(i32::MAX)
76    });
77    for path in paths {
78        let path = path.path();
79        if path.extension().and_then(|ext| ext.to_str()) == Some("sql") {
80            let migration_name = path.file_name().unwrap().to_str().unwrap().to_string();
81            info!("Checking migration: {}", migration_name);
82            let result = check_migration(&migration_name, manager_pool).await?;
83            if !result {
84                info!("Applying migration: {}", &migration_name);
85                let target_transaction = target_connection.transaction().await?;
86                let mut query = fs::read_to_string(Path::new(&path)).await?;
87                replace_vars_in_string(&mut query, env_path);
88                if let Err(e) = target_transaction.batch_execute(&query).await {
89                    error!("Migration {} query error: {:#?}", &migration_name, e)
90                }
91                if let Err(e) = target_transaction.commit().await {
92                    error!("Migration {} commit error: {:#?}", &migration_name, e)
93                }
94
95                let manager_transaction = manager_connection.transaction().await?;
96                manager_transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
97                manager_transaction.commit().await?;
98
99                info!("Applied migration: {:#?}", &migration_name);
100            }
101        }
102    }
103    Ok(())
104}
105
106// pub fn replace_vars_in_string(input: &mut String, env_path: &str) {
107//     let dotenv_vars = from_path_iter(env_path).expect("Failed to read .env file");
108//     let mut env_vars: HashMap<String, String> = HashMap::new();
109//     for item in dotenv_vars {
110//         let (key, value) = item.expect("Failed to parse .env file");
111//         env_vars.insert(key, value);
112//     }
113//     for (key, value) in &env_vars {
114//         let pattern = format!("[{}]", key);
115//         *input = input.replace(&pattern, value);
116//     }
117// }
118
119pub fn replace_vars_in_string(input: &mut String, env_path: &str) {
120    from_path(env_path).expect("Failed to read .env file");
121    let dotenv_vars = dotenv::vars();
122    for (key, value) in dotenv_vars {
123        let pattern = format!("[{}]", key);
124        *input = input.replace(&pattern, value.as_str());
125    }
126}