rorm_cli/migrate/
mod.rs

1use std::cmp::Ordering;
2use std::path::Path;
3
4use anyhow::{anyhow, Context};
5use rorm_db::executor::{Executor, Nothing, Optional};
6use rorm_db::sql::create_table::CreateTable;
7use rorm_db::sql::insert::Insert;
8use rorm_db::sql::DBImpl;
9use rorm_db::Database;
10use rorm_declaration::config::DatabaseConfig;
11use rorm_declaration::imr::{Annotation, DbType};
12use rorm_declaration::migration::Migration;
13
14use crate::log_sql;
15use crate::migrate::config::{create_db_config, deserialize_db_conf};
16use crate::migrate::sql_builder::migration_to_sql;
17use crate::utils::migrations::get_existing_migrations;
18
19pub mod config;
20pub mod sql_builder;
21
22/// Options for running migrations
23pub struct MigrateOptions {
24    /// Directory, migrations exist in
25    pub migration_dir: String,
26
27    /// Path to the database configuration file
28    pub database_config: String,
29
30    /// Log all SQL statements
31    pub log_queries: bool,
32
33    /// Apply only to (inclusive) the given id, if set
34    pub apply_until: Option<u16>,
35}
36
37/// Helper method to apply one migration. Writes also to last migration table.
38///
39/// - `migration`: [`&Migration`](Migration): Reference to the migration to apply.
40/// - `db`: [`&Database`](Database): Database to apply the migration onto.
41/// - `last_migration_table_name`: [`&str`]: Name of the table to insert successful applied migrations into.
42pub async fn apply_migration(
43    dialect: DBImpl,
44    migration: &Migration,
45    db: &Database,
46    last_migration_table_name: &str,
47    do_log: bool,
48) -> anyhow::Result<()> {
49    let mut tx = db
50        .start_transaction()
51        .await
52        .with_context(|| format!("Error while starting transaction {}", migration.id))?;
53
54    if let Err(e) = migration_to_sql(&mut tx, dialect, migration, do_log).await {
55        tx.rollback()
56            .await
57            .with_context(|| "Error while rollback in transaction")?;
58        return Err(e);
59    }
60
61    let v: &[&[rorm_db::sql::value::Value]] =
62        &[&[rorm_db::sql::value::Value::I32(migration.id as i32)]];
63    let (query_string, bind_params) = dialect
64        .insert(last_migration_table_name, &["migration_id"], v, None)
65        .rollback_transaction()
66        .build();
67
68    if do_log {
69        println!("{query_string}");
70    }
71
72    tx.execute::<Nothing>(query_string, bind_params).await.with_context(|| {
73        format!(
74            "Error while inserting applied migration {last_migration_table_name} into last migration table",
75        )
76    })?;
77
78    println!("Applied migration {:04}_{}", migration.id, migration.name);
79
80    tx.commit().await.with_context(|| {
81        format!("Error while committing transaction {last_migration_table_name}",)
82    })?;
83
84    Ok(())
85}
86
87/// Applies migrations on the given database with a given driver
88pub async fn run_migrate_custom(
89    db_conf: DatabaseConfig,
90    migration_dir: String,
91    log_sql: bool,
92    apply_until: Option<u16>,
93) -> anyhow::Result<()> {
94    let p = Path::new(migration_dir.as_str());
95    if !p.exists() || p.is_file() {
96        println!(
97            "Couldn't find the migration directory in {} \n\n\
98            You can specify an alternative path with --migration-dir <PATH>",
99            migration_dir.as_str()
100        );
101        return Ok(());
102    }
103
104    let existing_migrations = get_existing_migrations(migration_dir.as_str())
105        .with_context(|| "Couldn't retrieve existing migrations")?;
106
107    if existing_migrations.is_empty() {
108        println!("No migrations found.\nExiting.");
109        return Ok(());
110    }
111
112    let pool = Database::connect(rorm_db::DatabaseConfiguration {
113        driver: db_conf.driver,
114        min_connections: 1,
115        max_connections: 1,
116    })
117    .await?;
118
119    let last_migration_table_name = db_conf
120        .last_migration_table_name
121        .as_ref()
122        .map_or("_rorm__last_migration", |x| x.as_str());
123
124    let db_impl = (&pool).dialect();
125    let statements = db_impl
126        .create_table(last_migration_table_name)
127        .add_column(db_impl.create_column(
128            last_migration_table_name,
129            "id",
130            DbType::Int64,
131            &[Annotation::PrimaryKey, Annotation::AutoIncrement],
132        ))
133        .add_column(db_impl.create_column(
134            last_migration_table_name,
135            "updated_at",
136            DbType::DateTime,
137            &[Annotation::AutoUpdateTime],
138        ))
139        .add_column(db_impl.create_column(
140            last_migration_table_name,
141            "migration_id",
142            DbType::Int32,
143            &[Annotation::NotNull],
144        ))
145        .if_not_exists()
146        .build()?;
147
148    let mut tx = pool
149        .start_transaction()
150        .await
151        .with_context(|| "Could not create transaction")?;
152
153    for (query_string, bind_params) in statements {
154        if log_sql {
155            println!("{}", query_string.as_str());
156        }
157
158        tx.execute::<Nothing>(query_string, bind_params)
159            .await
160            .with_context(|| "Couldn't create internal last migration table")?;
161    }
162
163    tx.commit()
164        .await
165        .with_context(|| "Couldn't create internal last migration table")?;
166
167    let last_migration: Option<i32> = pool
168        .execute::<Optional>(
169            log_sql!(
170                format!(
171                    "SELECT migration_id FROM {} ORDER BY id DESC LIMIT 1;",
172                    &last_migration_table_name
173                ),
174                log_sql
175            ),
176            Vec::new(),
177        )
178        .await
179        .and_then(|option| option.map(|row| row.get(0)).transpose().map_err(Into::into))
180        .with_context(|| {
181            "Couldn't fetch information about successful migrations from migration table"
182        })?;
183
184    match last_migration {
185        None => {
186            // Apply all migrations
187            for migration in &existing_migrations {
188                apply_migration(
189                    db_impl,
190                    migration,
191                    &pool,
192                    last_migration_table_name,
193                    log_sql,
194                )
195                .await?;
196
197                if let Some(apply_until) = apply_until {
198                    if migration.id == apply_until {
199                        println!(
200                            "Applied all migrations until (inclusive) migration {apply_until:04}"
201                        );
202                        break;
203                    }
204                }
205            }
206        }
207        Some(id) => {
208            let id = id as u16;
209            // Search for last applied migration
210            if existing_migrations.iter().any(|x| x.id == id) {
211                let mut apply = false;
212                for (idx, migration) in existing_migrations.iter().enumerate() {
213                    if apply {
214                        apply_migration(
215                            db_impl,
216                            migration,
217                            &pool,
218                            last_migration_table_name,
219                            log_sql,
220                        )
221                        .await?;
222                        continue;
223                    }
224
225                    if migration.id == id {
226                        apply = true;
227
228                        if idx == existing_migrations.len() - 1 {
229                            println!("All migration have already been applied.");
230                        }
231                    }
232
233                    if let Some(apply_until) = apply_until {
234                        match migration.id.cmp(&apply_until) {
235                            Ordering::Equal => {
236                                if apply {
237                                    println!(
238                                        "Applied all migrations until (inclusive) migration {apply_until:04}"
239                                    );
240                                } else {
241                                    println!(
242                                        "All migrations until (inclusive) migration {apply_until:04} have already been applied"
243                                    );
244                                }
245                                break;
246                            }
247                            Ordering::Greater => break,
248                            Ordering::Less => {}
249                        }
250                    }
251                }
252            } else {
253                // If last applied migration could not be found in existing migrations,
254                // panic as there's no way to determine what to do next
255                return Err(anyhow!(
256                    r#"Last applied migration {id} was not found in current migrations.
257 
258Can not proceed any further without damaging data.
259To correct, empty the {last_migration_table_name} table or reset the whole database."#,
260                ));
261            }
262        }
263    }
264
265    Ok(())
266}
267
268/// Applies migrations on the given database
269pub async fn run_migrate(options: MigrateOptions) -> anyhow::Result<()> {
270    let db_conf_path = Path::new(options.database_config.as_str());
271
272    if !&db_conf_path.exists() {
273        println!(
274            "Couldn't find the database configuration file, created {} and exiting",
275            options.database_config.as_str()
276        );
277        create_db_config(db_conf_path)?;
278        return Ok(());
279    }
280
281    let db_conf = deserialize_db_conf(db_conf_path)?;
282
283    run_migrate_custom(
284        db_conf,
285        options.migration_dir,
286        options.log_queries,
287        options.apply_until,
288    )
289    .await
290}