Skip to main content

rorm_cli/migrate/
mod.rs

1use std::cmp::Ordering;
2use std::path::Path;
3
4use anyhow::{anyhow, Context};
5use rorm_db::executor::{Executor, Nothing, Optional};
6use rorm_db::sql::create_table::CreateTable;
7use rorm_db::transaction::TransactionError;
8use rorm_db::Database;
9use rorm_declaration::config::DatabaseConfig;
10use rorm_declaration::imr::{Annotation, DbType};
11use tracing::{error, info};
12
13use crate::migrate::apply::apply_migration;
14use crate::migrate::config::{create_db_config, deserialize_db_conf};
15use crate::utils::migrations::get_existing_migrations;
16
17pub mod apply;
18pub mod config;
19
20/// Options for running migrations
21pub struct MigrateOptions {
22    /// Directory, migrations exist in
23    pub migration_dir: String,
24
25    /// Path to the database configuration file
26    pub database_config: String,
27
28    /// Apply only to (inclusive) the given id, if set
29    pub apply_until: Option<u16>,
30}
31
32/// Applies migrations on the given database with a given driver
33pub async fn run_migrate_custom(
34    db_conf: DatabaseConfig,
35    migration_dir: String,
36    apply_until: Option<u16>,
37) -> anyhow::Result<()> {
38    let p = Path::new(migration_dir.as_str());
39    if !p.exists() || p.is_file() {
40        error!(
41            "Couldn't find the migration directory in {} \n\n\
42            You can specify an alternative path with --migration-dir <PATH>",
43            migration_dir.as_str()
44        );
45        return Ok(());
46    }
47
48    let existing_migrations = get_existing_migrations(migration_dir.as_str())
49        .with_context(|| "Couldn't retrieve existing migrations")?;
50
51    if existing_migrations.is_empty() {
52        info!("No migrations found.\nExiting.");
53        return Ok(());
54    }
55
56    let db = Database::connect(rorm_db::DatabaseConfiguration {
57        driver: db_conf.driver,
58        min_connections: 1,
59        max_connections: 1,
60    })
61    .await?;
62
63    let last_migration_table_name = db_conf
64        .last_migration_table_name
65        .as_ref()
66        .map_or("_rorm__last_migration", |x| x.as_str());
67
68    let db_impl = (&db).dialect();
69    let statements = db_impl
70        .create_table(last_migration_table_name)
71        .add_column(db_impl.create_column(
72            last_migration_table_name,
73            "id",
74            DbType::Int64,
75            &[Annotation::PrimaryKey, Annotation::AutoIncrement],
76        ))
77        .add_column(db_impl.create_column(
78            last_migration_table_name,
79            "updated_at",
80            DbType::DateTime,
81            &[Annotation::AutoUpdateTime],
82        ))
83        .add_column(db_impl.create_column(
84            last_migration_table_name,
85            "migration_id",
86            DbType::Int32,
87            &[Annotation::NotNull],
88        ))
89        .if_not_exists()
90        .build()?;
91
92    let mut tx = db
93        .start_transaction()
94        .await
95        .with_context(|| "Could not create transaction")?;
96
97    for (query_string, bind_params) in statements {
98        tx.execute::<Nothing>(query_string, bind_params)
99            .await
100            .with_context(|| "Couldn't create internal last migration table")?;
101    }
102
103    tx.commit()
104        .await
105        .map_err(|x| match x {
106            TransactionError::Database(x) => x,
107            TransactionError::Hook(_) => unreachable!("rorm-cli does not use hooks"),
108        })
109        .with_context(|| "Couldn't create internal last migration table")?;
110
111    let last_migration: Option<i32> = db
112        .execute::<Optional>(
113            format!(
114                "SELECT migration_id FROM {} ORDER BY id DESC LIMIT 1;",
115                &last_migration_table_name
116            ),
117            Vec::new(),
118        )
119        .await
120        .and_then(|option| option.map(|row| row.get(0)).transpose().map_err(Into::into))
121        .with_context(|| {
122            "Couldn't fetch information about successful migrations from migration table"
123        })?;
124
125    match last_migration {
126        None => {
127            // Apply all migrations
128            for migration in &existing_migrations {
129                apply_migration(&db, migration, last_migration_table_name).await?;
130                info!("Applied migration {:04}_{}", migration.id, migration.name);
131
132                if let Some(apply_until) = apply_until {
133                    if migration.id == apply_until {
134                        info!(
135                            "Applied all migrations until (inclusive) migration {apply_until:04}"
136                        );
137                        break;
138                    }
139                }
140            }
141        }
142        Some(id) => {
143            let id = id as u16;
144            // Search for last applied migration
145            if existing_migrations.iter().any(|x| x.id == id) {
146                let mut apply = false;
147                for (idx, migration) in existing_migrations.iter().enumerate() {
148                    if apply {
149                        apply_migration(&db, migration, last_migration_table_name).await?;
150                        info!("Applied migration {:04}_{}", migration.id, migration.name);
151                        continue;
152                    }
153
154                    if migration.id == id {
155                        apply = true;
156
157                        if idx == existing_migrations.len() - 1 {
158                            info!("All migration have already been applied.");
159                        }
160                    }
161
162                    if let Some(apply_until) = apply_until {
163                        match migration.id.cmp(&apply_until) {
164                            Ordering::Equal => {
165                                if apply {
166                                    info!(
167                                        "Applied all migrations until (inclusive) migration {apply_until:04}"
168                                    );
169                                } else {
170                                    info!(
171                                        "All migrations until (inclusive) migration {apply_until:04} have already been applied"
172                                    );
173                                }
174                                break;
175                            }
176                            Ordering::Greater => break,
177                            Ordering::Less => {}
178                        }
179                    }
180                }
181            } else {
182                // If last applied migration could not be found in existing migrations,
183                // panic as there's no way to determine what to do next
184                return Err(anyhow!(
185                    r#"Last applied migration {id} was not found in current migrations.
186 
187Can not proceed any further without damaging data.
188To correct, empty the {last_migration_table_name} table or reset the whole database."#,
189                ));
190            }
191        }
192    }
193
194    db.close().await;
195    Ok(())
196}
197
198/// Applies migrations on the given database
199pub async fn run_migrate(options: MigrateOptions) -> anyhow::Result<()> {
200    let db_conf_path = Path::new(options.database_config.as_str());
201
202    if !&db_conf_path.exists() {
203        error!(
204            "Couldn't find the database configuration file, created {} and exiting",
205            options.database_config.as_str()
206        );
207        create_db_config(db_conf_path)?;
208        return Ok(());
209    }
210
211    let db_conf = deserialize_db_conf(db_conf_path)?;
212
213    run_migrate_custom(db_conf, options.migration_dir, options.apply_until).await
214}