1use std::cmp::Ordering;
2use std::path::Path;
3
4use anyhow::{anyhow, Context};
5use rorm_db::executor::{Executor, Nothing, Optional};
6use rorm_db::sql::create_table::CreateTable;
7use rorm_db::transaction::TransactionError;
8use rorm_db::Database;
9use rorm_declaration::config::DatabaseConfig;
10use rorm_declaration::imr::{Annotation, DbType};
11use tracing::{error, info};
12
13use crate::migrate::apply::apply_migration;
14use crate::migrate::config::{create_db_config, deserialize_db_conf};
15use crate::utils::migrations::get_existing_migrations;
16
17pub mod apply;
18pub mod config;
19
20pub struct MigrateOptions {
22 pub migration_dir: String,
24
25 pub database_config: String,
27
28 pub apply_until: Option<u16>,
30}
31
32pub async fn run_migrate_custom(
34 db_conf: DatabaseConfig,
35 migration_dir: String,
36 apply_until: Option<u16>,
37) -> anyhow::Result<()> {
38 let p = Path::new(migration_dir.as_str());
39 if !p.exists() || p.is_file() {
40 error!(
41 "Couldn't find the migration directory in {} \n\n\
42 You can specify an alternative path with --migration-dir <PATH>",
43 migration_dir.as_str()
44 );
45 return Ok(());
46 }
47
48 let existing_migrations = get_existing_migrations(migration_dir.as_str())
49 .with_context(|| "Couldn't retrieve existing migrations")?;
50
51 if existing_migrations.is_empty() {
52 info!("No migrations found.\nExiting.");
53 return Ok(());
54 }
55
56 let db = Database::connect(rorm_db::DatabaseConfiguration {
57 driver: db_conf.driver,
58 min_connections: 1,
59 max_connections: 1,
60 })
61 .await?;
62
63 let last_migration_table_name = db_conf
64 .last_migration_table_name
65 .as_ref()
66 .map_or("_rorm__last_migration", |x| x.as_str());
67
68 let db_impl = (&db).dialect();
69 let statements = db_impl
70 .create_table(last_migration_table_name)
71 .add_column(db_impl.create_column(
72 last_migration_table_name,
73 "id",
74 DbType::Int64,
75 &[Annotation::PrimaryKey, Annotation::AutoIncrement],
76 ))
77 .add_column(db_impl.create_column(
78 last_migration_table_name,
79 "updated_at",
80 DbType::DateTime,
81 &[Annotation::AutoUpdateTime],
82 ))
83 .add_column(db_impl.create_column(
84 last_migration_table_name,
85 "migration_id",
86 DbType::Int32,
87 &[Annotation::NotNull],
88 ))
89 .if_not_exists()
90 .build()?;
91
92 let mut tx = db
93 .start_transaction()
94 .await
95 .with_context(|| "Could not create transaction")?;
96
97 for (query_string, bind_params) in statements {
98 tx.execute::<Nothing>(query_string, bind_params)
99 .await
100 .with_context(|| "Couldn't create internal last migration table")?;
101 }
102
103 tx.commit()
104 .await
105 .map_err(|x| match x {
106 TransactionError::Database(x) => x,
107 TransactionError::Hook(_) => unreachable!("rorm-cli does not use hooks"),
108 })
109 .with_context(|| "Couldn't create internal last migration table")?;
110
111 let last_migration: Option<i32> = db
112 .execute::<Optional>(
113 format!(
114 "SELECT migration_id FROM {} ORDER BY id DESC LIMIT 1;",
115 &last_migration_table_name
116 ),
117 Vec::new(),
118 )
119 .await
120 .and_then(|option| option.map(|row| row.get(0)).transpose().map_err(Into::into))
121 .with_context(|| {
122 "Couldn't fetch information about successful migrations from migration table"
123 })?;
124
125 match last_migration {
126 None => {
127 for migration in &existing_migrations {
129 apply_migration(&db, migration, last_migration_table_name).await?;
130 info!("Applied migration {:04}_{}", migration.id, migration.name);
131
132 if let Some(apply_until) = apply_until {
133 if migration.id == apply_until {
134 info!(
135 "Applied all migrations until (inclusive) migration {apply_until:04}"
136 );
137 break;
138 }
139 }
140 }
141 }
142 Some(id) => {
143 let id = id as u16;
144 if existing_migrations.iter().any(|x| x.id == id) {
146 let mut apply = false;
147 for (idx, migration) in existing_migrations.iter().enumerate() {
148 if apply {
149 apply_migration(&db, migration, last_migration_table_name).await?;
150 info!("Applied migration {:04}_{}", migration.id, migration.name);
151 continue;
152 }
153
154 if migration.id == id {
155 apply = true;
156
157 if idx == existing_migrations.len() - 1 {
158 info!("All migration have already been applied.");
159 }
160 }
161
162 if let Some(apply_until) = apply_until {
163 match migration.id.cmp(&apply_until) {
164 Ordering::Equal => {
165 if apply {
166 info!(
167 "Applied all migrations until (inclusive) migration {apply_until:04}"
168 );
169 } else {
170 info!(
171 "All migrations until (inclusive) migration {apply_until:04} have already been applied"
172 );
173 }
174 break;
175 }
176 Ordering::Greater => break,
177 Ordering::Less => {}
178 }
179 }
180 }
181 } else {
182 return Err(anyhow!(
185 r#"Last applied migration {id} was not found in current migrations.
186
187Can not proceed any further without damaging data.
188To correct, empty the {last_migration_table_name} table or reset the whole database."#,
189 ));
190 }
191 }
192 }
193
194 db.close().await;
195 Ok(())
196}
197
198pub async fn run_migrate(options: MigrateOptions) -> anyhow::Result<()> {
200 let db_conf_path = Path::new(options.database_config.as_str());
201
202 if !&db_conf_path.exists() {
203 error!(
204 "Couldn't find the database configuration file, created {} and exiting",
205 options.database_config.as_str()
206 );
207 create_db_config(db_conf_path)?;
208 return Ok(());
209 }
210
211 let db_conf = deserialize_db_conf(db_conf_path)?;
212
213 run_migrate_custom(db_conf, options.migration_dir, options.apply_until).await
214}