1use std::cmp::Ordering;
2use std::path::Path;
3
4use anyhow::{anyhow, Context};
5use rorm_db::executor::{Executor, Nothing, Optional};
6use rorm_db::sql::create_table::CreateTable;
7use rorm_db::sql::insert::Insert;
8use rorm_db::sql::DBImpl;
9use rorm_db::Database;
10use rorm_declaration::config::DatabaseConfig;
11use rorm_declaration::imr::{Annotation, DbType};
12use rorm_declaration::migration::Migration;
13
14use crate::log_sql;
15use crate::migrate::config::{create_db_config, deserialize_db_conf};
16use crate::migrate::sql_builder::migration_to_sql;
17use crate::utils::migrations::get_existing_migrations;
18
19pub mod config;
20pub mod sql_builder;
21
22pub struct MigrateOptions {
24 pub migration_dir: String,
26
27 pub database_config: String,
29
30 pub log_queries: bool,
32
33 pub apply_until: Option<u16>,
35}
36
37pub async fn apply_migration(
43 dialect: DBImpl,
44 migration: &Migration,
45 db: &Database,
46 last_migration_table_name: &str,
47 do_log: bool,
48) -> anyhow::Result<()> {
49 let mut tx = db
50 .start_transaction()
51 .await
52 .with_context(|| format!("Error while starting transaction {}", migration.id))?;
53
54 if let Err(e) = migration_to_sql(&mut tx, dialect, migration, do_log).await {
55 tx.rollback()
56 .await
57 .with_context(|| "Error while rollback in transaction")?;
58 return Err(e);
59 }
60
61 let v: &[&[rorm_db::sql::value::Value]] =
62 &[&[rorm_db::sql::value::Value::I32(migration.id as i32)]];
63 let (query_string, bind_params) = dialect
64 .insert(last_migration_table_name, &["migration_id"], v, None)
65 .rollback_transaction()
66 .build();
67
68 if do_log {
69 println!("{query_string}");
70 }
71
72 tx.execute::<Nothing>(query_string, bind_params).await.with_context(|| {
73 format!(
74 "Error while inserting applied migration {last_migration_table_name} into last migration table",
75 )
76 })?;
77
78 println!("Applied migration {:04}_{}", migration.id, migration.name);
79
80 tx.commit().await.with_context(|| {
81 format!("Error while committing transaction {last_migration_table_name}",)
82 })?;
83
84 Ok(())
85}
86
87pub async fn run_migrate_custom(
89 db_conf: DatabaseConfig,
90 migration_dir: String,
91 log_sql: bool,
92 apply_until: Option<u16>,
93) -> anyhow::Result<()> {
94 let p = Path::new(migration_dir.as_str());
95 if !p.exists() || p.is_file() {
96 println!(
97 "Couldn't find the migration directory in {} \n\n\
98 You can specify an alternative path with --migration-dir <PATH>",
99 migration_dir.as_str()
100 );
101 return Ok(());
102 }
103
104 let existing_migrations = get_existing_migrations(migration_dir.as_str())
105 .with_context(|| "Couldn't retrieve existing migrations")?;
106
107 if existing_migrations.is_empty() {
108 println!("No migrations found.\nExiting.");
109 return Ok(());
110 }
111
112 let pool = Database::connect(rorm_db::DatabaseConfiguration {
113 driver: db_conf.driver,
114 min_connections: 1,
115 max_connections: 1,
116 })
117 .await?;
118
119 let last_migration_table_name = db_conf
120 .last_migration_table_name
121 .as_ref()
122 .map_or("_rorm__last_migration", |x| x.as_str());
123
124 let db_impl = (&pool).dialect();
125 let statements = db_impl
126 .create_table(last_migration_table_name)
127 .add_column(db_impl.create_column(
128 last_migration_table_name,
129 "id",
130 DbType::Int64,
131 &[Annotation::PrimaryKey, Annotation::AutoIncrement],
132 ))
133 .add_column(db_impl.create_column(
134 last_migration_table_name,
135 "updated_at",
136 DbType::DateTime,
137 &[Annotation::AutoUpdateTime],
138 ))
139 .add_column(db_impl.create_column(
140 last_migration_table_name,
141 "migration_id",
142 DbType::Int32,
143 &[Annotation::NotNull],
144 ))
145 .if_not_exists()
146 .build()?;
147
148 let mut tx = pool
149 .start_transaction()
150 .await
151 .with_context(|| "Could not create transaction")?;
152
153 for (query_string, bind_params) in statements {
154 if log_sql {
155 println!("{}", query_string.as_str());
156 }
157
158 tx.execute::<Nothing>(query_string, bind_params)
159 .await
160 .with_context(|| "Couldn't create internal last migration table")?;
161 }
162
163 tx.commit()
164 .await
165 .with_context(|| "Couldn't create internal last migration table")?;
166
167 let last_migration: Option<i32> = pool
168 .execute::<Optional>(
169 log_sql!(
170 format!(
171 "SELECT migration_id FROM {} ORDER BY id DESC LIMIT 1;",
172 &last_migration_table_name
173 ),
174 log_sql
175 ),
176 Vec::new(),
177 )
178 .await
179 .and_then(|option| option.map(|row| row.get(0)).transpose().map_err(Into::into))
180 .with_context(|| {
181 "Couldn't fetch information about successful migrations from migration table"
182 })?;
183
184 match last_migration {
185 None => {
186 for migration in &existing_migrations {
188 apply_migration(
189 db_impl,
190 migration,
191 &pool,
192 last_migration_table_name,
193 log_sql,
194 )
195 .await?;
196
197 if let Some(apply_until) = apply_until {
198 if migration.id == apply_until {
199 println!(
200 "Applied all migrations until (inclusive) migration {apply_until:04}"
201 );
202 break;
203 }
204 }
205 }
206 }
207 Some(id) => {
208 let id = id as u16;
209 if existing_migrations.iter().any(|x| x.id == id) {
211 let mut apply = false;
212 for (idx, migration) in existing_migrations.iter().enumerate() {
213 if apply {
214 apply_migration(
215 db_impl,
216 migration,
217 &pool,
218 last_migration_table_name,
219 log_sql,
220 )
221 .await?;
222 continue;
223 }
224
225 if migration.id == id {
226 apply = true;
227
228 if idx == existing_migrations.len() - 1 {
229 println!("All migration have already been applied.");
230 }
231 }
232
233 if let Some(apply_until) = apply_until {
234 match migration.id.cmp(&apply_until) {
235 Ordering::Equal => {
236 if apply {
237 println!(
238 "Applied all migrations until (inclusive) migration {apply_until:04}"
239 );
240 } else {
241 println!(
242 "All migrations until (inclusive) migration {apply_until:04} have already been applied"
243 );
244 }
245 break;
246 }
247 Ordering::Greater => break,
248 Ordering::Less => {}
249 }
250 }
251 }
252 } else {
253 return Err(anyhow!(
256 r#"Last applied migration {id} was not found in current migrations.
257
258Can not proceed any further without damaging data.
259To correct, empty the {last_migration_table_name} table or reset the whole database."#,
260 ));
261 }
262 }
263 }
264
265 Ok(())
266}
267
268pub async fn run_migrate(options: MigrateOptions) -> anyhow::Result<()> {
270 let db_conf_path = Path::new(options.database_config.as_str());
271
272 if !&db_conf_path.exists() {
273 println!(
274 "Couldn't find the database configuration file, created {} and exiting",
275 options.database_config.as_str()
276 );
277 create_db_config(db_conf_path)?;
278 return Ok(());
279 }
280
281 let db_conf = deserialize_db_conf(db_conf_path)?;
282
283 run_migrate_custom(
284 db_conf,
285 options.migration_dir,
286 options.log_queries,
287 options.apply_until,
288 )
289 .await
290}