1use std::collections::HashMap;
2use std::error::Error;
3use std::ffi::OsStr;
4use std::fmt::Debug;
5use std::path::Path;
6use bb8_postgres::bb8::Pool;
7use bb8_postgres::PostgresConnectionManager;
8use bb8_postgres::tokio_postgres::{NoTls};
9use tokio::fs;
10use tracing::{error, info};
11use dotenv::from_path_iter;
12
13const CHECK: &str = "SELECT EXISTS(SELECT 1 FROM ext.\"__IAC\" WHERE \"MigrationId\" = $1)";
14const SAVE: &str = "INSERT INTO ext.\"__IAC\" (\"MigrationId\", \"ProductVersion\") VALUES ($1, $2)";
15const INITIALIZE: &str = "create table if not exists ext.\"__IAC\"
16(
17\"MigrationId\" varchar(150) not null
18 constraint \"PK___IAC\"
19 primary key,
20 \"ProductVersion\" varchar(32) not null
21);";
22const GRANT_OWNER: &str = "alter table ext.\"__IAC\" owner to \"MIS_USER\";";
23
24const CREATE_SCHEMA: &str = "CREATE SCHEMA IF NOT EXISTS ext;";
25const SET_SCHEMA_OWNER: &str = "ALTER SCHEMA ext OWNER TO \"MIS_USER\";";
26
27const CHECK_EXTENSION: &str = "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'uuid-ossp');";
28const CREATE_EXTENSION: &str = "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";";
29
30pub async fn prepare(pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
31 let connection = pool.get().await?;
32 connection.execute(CREATE_SCHEMA, &[]).await?;
33 connection.execute(SET_SCHEMA_OWNER, &[]).await?;
34 let extension_exists: bool = connection.query_one(CHECK_EXTENSION, &[]).await?.get(0);
35 if !extension_exists {
36 connection.execute(CREATE_EXTENSION, &[]).await?;
38 }
39 connection.execute(INITIALIZE, &[]).await?;
40 connection.execute(GRANT_OWNER, &[]).await?;
41 Ok(())
42}
43
44pub async fn check_migration(migration_name: &String, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<bool, Box<dyn Error + Send + Sync>> {
45 let connection = pool.get().await?;
46 let row = connection.query_one(CHECK, &[migration_name]).await?;
47 let exists: bool = row.get(0);
48 Ok(exists)
49}
50
51pub async fn apply_migration(migration_name: String, file_names: Vec<&str>, pool: &Pool<PostgresConnectionManager<NoTls>>) -> Result<(), Box<dyn Error + Send + Sync>> {
52 let mut connection = pool.get().await?;
53 info!("Checking migration: {}", migration_name);
54 let result = check_migration(&migration_name, pool).await?;
55 if !result {
56 info!("Applying migration: {}", &migration_name);
57 let transaction = connection.transaction().await?;
58 for file_name in file_names {
59 let file_path = format!("/usr/local/bin/scripts/{}.sql", file_name);
60 let query = fs::read_to_string(Path::new(&file_path)).await?;
61 transaction.batch_execute(&query).await?;
62 }
63 transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
64 transaction.commit().await?;
65 info!("Applied migration: {}", &migration_name);
66 }
67 Ok(())
68}
69
70pub async fn apply_migrations_from_dir(dir: String, manager_pool: &Pool<PostgresConnectionManager<NoTls>>, target_pool: &Pool<PostgresConnectionManager<NoTls>>, env_path: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
71 let mut manager_connection = manager_pool.get().await?;
72 let mut target_connection = target_pool.get().await?;
73
74 let mut paths: Vec<_> = std::fs::read_dir(dir).unwrap().map(|res| res.unwrap()).collect();
75 paths.sort_by_key(|entry| {
76 entry.file_name().to_string_lossy().split_once('-')
77 .map(|(prefix, _)| prefix.parse::<i32>().unwrap_or(i32::MAX))
78 .unwrap_or(i32::MAX)
79 });
80 for path in paths {
81 let path = path.path();
82 if path.extension().and_then(|ext| ext.to_str()) == Some("sql") {
83 let migration_name = path.file_name().unwrap().to_str().unwrap().to_string();
84 info!("Checking migration: {}", migration_name);
85 let result = check_migration(&migration_name, manager_pool).await?;
86 if !result {
87 info!("Applying migration: {}", &migration_name);
88 let target_transaction = target_connection.transaction().await?;
89 let mut query = fs::read_to_string(Path::new(&path)).await?;
90 replace_vars_in_string(&mut query, env_path);
91 if let Err(e) = target_transaction.batch_execute(&query).await {
92 error!("Migration {} query error: {:#?}", &migration_name, e)
93 }
94 if let Err(e) = target_transaction.commit().await {
95 error!("Migration {} commit error: {:#?}", &migration_name, e)
96 }
97
98 let manager_transaction = manager_connection.transaction().await?;
99 manager_transaction.execute(SAVE, &[&migration_name, &"rust"]).await?;
100 manager_transaction.commit().await?;
101
102 info!("Applied migration: {:#?}", &migration_name);
103 }
104 }
105 }
106 Ok(())
107}
108
109pub fn replace_vars_in_string(input: &mut String, env_path: &str) {
110 let dotenv_vars = from_path_iter(env_path).expect("Failed to read .env file");
111 let mut env_vars: HashMap<String, String> = HashMap::new();
112 for item in dotenv_vars {
113 let (key, value) = item.expect("Failed to parse .env file");
114 env_vars.insert(key, value);
115 }
116 for (key, value) in &env_vars {
117 let pattern = format!("[{}]", key);
118 *input = input.replace(&pattern, value);
119 }
120}