1use std::cmp::Ordering;
2use std::path::Path;
3
4use anyhow::{anyhow, Context};
5use rorm_db::executor::{Executor, Nothing, Optional};
6use rorm_db::sql::create_table::CreateTable;
7use rorm_db::sql::insert::Insert;
8use rorm_db::sql::DBImpl;
9use rorm_db::Database;
10use rorm_declaration::config::DatabaseConfig;
11use rorm_declaration::imr::{Annotation, DbType};
12use rorm_declaration::migration::Migration;
13
14use crate::log_sql;
15use crate::migrate::config::{create_db_config, deserialize_db_conf};
16use crate::migrate::sql_builder::migration_to_sql;
17use crate::utils::migrations::get_existing_migrations;
18
19pub mod config;
20pub mod sql_builder;
21
22pub struct MigrateOptions {
24 pub migration_dir: String,
26
27 pub database_config: String,
29
30 pub log_queries: bool,
32
33 pub apply_until: Option<u16>,
35}
36
37pub async fn apply_migration(
43 dialect: DBImpl,
44 migration: &Migration,
45 db: &Database,
46 last_migration_table_name: &str,
47 do_log: bool,
48) -> anyhow::Result<()> {
49 let mut tx = db
50 .start_transaction()
51 .await
52 .with_context(|| format!("Error while starting transaction {}", migration.id))?;
53
54 if let Err(e) = migration_to_sql(&mut tx, dialect, migration, do_log).await {
55 tx.rollback()
56 .await
57 .with_context(|| "Error while rollback in transaction")?;
58 return Err(e);
59 }
60
61 let v: &[&[rorm_db::sql::value::Value]] =
62 &[&[rorm_db::sql::value::Value::I32(migration.id as i32)]];
63 let (query_string, bind_params) = dialect
64 .insert(last_migration_table_name, &["migration_id"], v, None)
65 .rollback_transaction()
66 .build();
67
68 if do_log {
69 println!("{query_string}");
70 }
71
72 tx.execute::<Nothing>(query_string, bind_params).await.with_context(|| {
73 format!(
74 "Error while inserting applied migration {last_migration_table_name} into last migration table",
75 )
76 })?;
77
78 println!("Applied migration {:04}_{}", migration.id, migration.name);
79
80 tx.commit().await.with_context(|| {
81 format!("Error while committing transaction {last_migration_table_name}",)
82 })?;
83
84 Ok(())
85}
86
87pub async fn run_migrate_custom(
89 db_conf: DatabaseConfig,
90 migration_dir: String,
91 log_sql: bool,
92 apply_until: Option<u16>,
93) -> anyhow::Result<()> {
94 let p = Path::new(migration_dir.as_str());
95 if !p.exists() || p.is_file() {
96 println!(
97 "Couldn't find the migration directory in {} \n\n\
98 You can specify an alternative path with --migration-dir <PATH>",
99 migration_dir.as_str()
100 );
101 return Ok(());
102 }
103
104 let existing_migrations = get_existing_migrations(migration_dir.as_str())
105 .with_context(|| "Couldn't retrieve existing migrations")?;
106
107 if existing_migrations.is_empty() {
108 println!("No migrations found.\nExiting.");
109 return Ok(());
110 }
111
112 let db = Database::connect(rorm_db::DatabaseConfiguration {
113 driver: db_conf.driver,
114 min_connections: 1,
115 max_connections: 1,
116 })
117 .await?;
118
119 let last_migration_table_name = db_conf
120 .last_migration_table_name
121 .as_ref()
122 .map_or("_rorm__last_migration", |x| x.as_str());
123
124 let db_impl = (&db).dialect();
125 let statements = db_impl
126 .create_table(last_migration_table_name)
127 .add_column(db_impl.create_column(
128 last_migration_table_name,
129 "id",
130 DbType::Int64,
131 &[Annotation::PrimaryKey, Annotation::AutoIncrement],
132 ))
133 .add_column(db_impl.create_column(
134 last_migration_table_name,
135 "updated_at",
136 DbType::DateTime,
137 &[Annotation::AutoUpdateTime],
138 ))
139 .add_column(db_impl.create_column(
140 last_migration_table_name,
141 "migration_id",
142 DbType::Int32,
143 &[Annotation::NotNull],
144 ))
145 .if_not_exists()
146 .build()?;
147
148 let mut tx = db
149 .start_transaction()
150 .await
151 .with_context(|| "Could not create transaction")?;
152
153 for (query_string, bind_params) in statements {
154 if log_sql {
155 println!("{}", query_string.as_str());
156 }
157
158 tx.execute::<Nothing>(query_string, bind_params)
159 .await
160 .with_context(|| "Couldn't create internal last migration table")?;
161 }
162
163 tx.commit()
164 .await
165 .with_context(|| "Couldn't create internal last migration table")?;
166
167 let last_migration: Option<i32> = db
168 .execute::<Optional>(
169 log_sql!(
170 format!(
171 "SELECT migration_id FROM {} ORDER BY id DESC LIMIT 1;",
172 &last_migration_table_name
173 ),
174 log_sql
175 ),
176 Vec::new(),
177 )
178 .await
179 .and_then(|option| option.map(|row| row.get(0)).transpose().map_err(Into::into))
180 .with_context(|| {
181 "Couldn't fetch information about successful migrations from migration table"
182 })?;
183
184 match last_migration {
185 None => {
186 for migration in &existing_migrations {
188 apply_migration(db_impl, migration, &db, last_migration_table_name, log_sql)
189 .await?;
190
191 if let Some(apply_until) = apply_until {
192 if migration.id == apply_until {
193 println!(
194 "Applied all migrations until (inclusive) migration {apply_until:04}"
195 );
196 break;
197 }
198 }
199 }
200 }
201 Some(id) => {
202 let id = id as u16;
203 if existing_migrations.iter().any(|x| x.id == id) {
205 let mut apply = false;
206 for (idx, migration) in existing_migrations.iter().enumerate() {
207 if apply {
208 apply_migration(
209 db_impl,
210 migration,
211 &db,
212 last_migration_table_name,
213 log_sql,
214 )
215 .await?;
216 continue;
217 }
218
219 if migration.id == id {
220 apply = true;
221
222 if idx == existing_migrations.len() - 1 {
223 println!("All migration have already been applied.");
224 }
225 }
226
227 if let Some(apply_until) = apply_until {
228 match migration.id.cmp(&apply_until) {
229 Ordering::Equal => {
230 if apply {
231 println!(
232 "Applied all migrations until (inclusive) migration {apply_until:04}"
233 );
234 } else {
235 println!(
236 "All migrations until (inclusive) migration {apply_until:04} have already been applied"
237 );
238 }
239 break;
240 }
241 Ordering::Greater => break,
242 Ordering::Less => {}
243 }
244 }
245 }
246 } else {
247 return Err(anyhow!(
250 r#"Last applied migration {id} was not found in current migrations.
251
252Can not proceed any further without damaging data.
253To correct, empty the {last_migration_table_name} table or reset the whole database."#,
254 ));
255 }
256 }
257 }
258
259 db.close().await;
260 Ok(())
261}
262
263pub async fn run_migrate(options: MigrateOptions) -> anyhow::Result<()> {
265 let db_conf_path = Path::new(options.database_config.as_str());
266
267 if !&db_conf_path.exists() {
268 println!(
269 "Couldn't find the database configuration file, created {} and exiting",
270 options.database_config.as_str()
271 );
272 create_db_config(db_conf_path)?;
273 return Ok(());
274 }
275
276 let db_conf = deserialize_db_conf(db_conf_path)?;
277
278 run_migrate_custom(
279 db_conf,
280 options.migration_dir,
281 options.log_queries,
282 options.apply_until,
283 )
284 .await
285}