1use crate::config::Config;
2use crate::opt::{AddMigrationOpts, ConnectOpts, MigrationSourceOpt};
3use anyhow::{bail, Context};
4use console::style;
5use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator};
6use sqlx::Connection;
7use std::borrow::Cow;
8use std::collections::{HashMap, HashSet};
9use std::fmt::Write;
10use std::fs::{self, File};
11use std::path::Path;
12use std::time::Duration;
13
14pub async fn add(opts: AddMigrationOpts) -> anyhow::Result<()> {
15 let config = opts.config.load_config().await?;
16
17 let source = opts.source.resolve_path(&config);
18
19 fs::create_dir_all(source).context("Unable to create migrations directory")?;
20
21 let migrator = opts.source.resolve(&config).await?;
22
23 let version_prefix = opts.version_prefix(&config, &migrator);
24
25 if opts.reversible(&config, &migrator) {
26 create_file(
27 source,
28 &version_prefix,
29 &opts.description,
30 MigrationType::ReversibleUp,
31 )?;
32 create_file(
33 source,
34 &version_prefix,
35 &opts.description,
36 MigrationType::ReversibleDown,
37 )?;
38 } else {
39 create_file(
40 source,
41 &version_prefix,
42 &opts.description,
43 MigrationType::Simple,
44 )?;
45 }
46
47 let has_existing_migrations = fs::read_dir(source)
49 .map(|mut dir| dir.next().is_some())
50 .unwrap_or(false);
51
52 if !has_existing_migrations {
53 let quoted_source = if opts.source.source.is_some() {
54 format!("{source:?}")
55 } else {
56 "".to_string()
57 };
58
59 let version = if let (Some(major), Some(minor)) = (
62 option_env!("CARGO_PKG_VERSION_MAJOR"),
64 option_env!("CARGO_PKG_VERSION_MINOR"),
65 ) {
66 format!("{major}.{minor}")
67 } else {
68 "latest".to_string()
70 };
71
72 print!(
73 r#"
74Congratulations on creating your first migration!
75
76Did you know you can embed your migrations in your application binary?
77On startup, after creating your database connection or pool, add:
78
79sqlx::migrate!({quoted_source}).run(<&your_pool OR &mut your_connection>).await?;
80
81Note that the compiler won't pick up new migrations if no Rust source files have changed.
82You can create a Cargo build script to work around this with `sqlx migrate build-script`.
83
84See: https://docs.rs/sqlx/{version}/sqlx/macro.migrate.html
85"#,
86 );
87 }
88
89 Ok(())
90}
91
92fn create_file(
93 migration_source: &str,
94 file_prefix: &str,
95 description: &str,
96 migration_type: MigrationType,
97) -> anyhow::Result<()> {
98 use std::path::PathBuf;
99
100 let mut file_name = file_prefix.to_string();
101 file_name.push('_');
102 file_name.push_str(&description.replace(' ', "_"));
103 file_name.push_str(migration_type.suffix());
104
105 let mut path = PathBuf::new();
106 path.push(migration_source);
107 path.push(&file_name);
108
109 println!("Creating {}", style(path.display()).cyan());
110
111 let mut file = File::create(&path).context("Failed to create migration file")?;
112
113 std::io::Write::write_all(&mut file, migration_type.file_content().as_bytes())?;
114
115 Ok(())
116}
117
118fn short_checksum(checksum: &[u8]) -> String {
119 let mut s = String::with_capacity(checksum.len() * 2);
120 for b in checksum {
121 write!(&mut s, "{b:02x?}").expect("should not fail to write to str");
122 }
123 s
124}
125
126pub async fn info(
127 config: &Config,
128 migration_source: &MigrationSourceOpt,
129 connect_opts: &ConnectOpts,
130) -> anyhow::Result<()> {
131 let migrator = migration_source.resolve(config).await?;
132
133 let mut conn = crate::connect(config, connect_opts).await?;
134
135 for schema_name in &config.migrate.create_schemas {
137 conn.create_schema_if_not_exists(schema_name).await?;
138 }
139
140 conn.ensure_migrations_table(config.migrate.table_name())
141 .await?;
142
143 let applied_migrations: HashMap<_, _> = conn
144 .list_applied_migrations(config.migrate.table_name())
145 .await?
146 .into_iter()
147 .map(|m| (m.version, m))
148 .collect();
149
150 for migration in migrator.iter() {
151 if migration.migration_type.is_down_migration() {
152 continue;
154 }
155
156 let applied = applied_migrations.get(&migration.version);
157
158 let (status_msg, mismatched_checksum) = if let Some(applied) = applied {
159 if applied.checksum != migration.checksum {
160 (style("installed (different checksum)").red(), true)
161 } else {
162 (style("installed").green(), false)
163 }
164 } else {
165 (style("pending").yellow(), false)
166 };
167
168 println!(
169 "{}/{} {}",
170 style(migration.version).cyan(),
171 status_msg,
172 migration.description
173 );
174
175 if mismatched_checksum {
176 println!(
177 "applied migration had checksum {}",
178 short_checksum(
179 &applied
180 .map(|a| a.checksum.clone())
181 .unwrap_or_else(|| Cow::Owned(vec![]))
182 ),
183 );
184 println!(
185 "local migration has checksum {}",
186 short_checksum(&migration.checksum)
187 )
188 }
189 }
190
191 let _ = conn.close().await;
192
193 Ok(())
194}
195
196fn validate_applied_migrations(
197 applied_migrations: &[AppliedMigration],
198 migrator: &Migrator,
199 ignore_missing: bool,
200) -> Result<(), MigrateError> {
201 if ignore_missing {
202 return Ok(());
203 }
204
205 let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
206
207 for applied_migration in applied_migrations {
208 if !migrations.contains(&applied_migration.version) {
209 return Err(MigrateError::VersionMissing(applied_migration.version));
210 }
211 }
212
213 Ok(())
214}
215
216pub async fn run(
217 config: &Config,
218 migration_source: &MigrationSourceOpt,
219 connect_opts: &ConnectOpts,
220 dry_run: bool,
221 ignore_missing: bool,
222 target_version: Option<i64>,
223 skip: bool,
224) -> anyhow::Result<()> {
225 let migrator = migration_source.resolve(config).await?;
226
227 if let Some(target_version) = target_version {
228 if !migrator.version_exists(target_version) {
229 bail!(MigrateError::VersionNotPresent(target_version));
230 }
231 }
232
233 let mut conn = crate::connect(config, connect_opts).await?;
234
235 for schema_name in &config.migrate.create_schemas {
236 conn.create_schema_if_not_exists(schema_name).await?;
237 }
238
239 conn.ensure_migrations_table(config.migrate.table_name())
240 .await?;
241
242 let version = conn.dirty_version(config.migrate.table_name()).await?;
243 if let Some(version) = version {
244 bail!(MigrateError::Dirty(version));
245 }
246
247 let applied_migrations = conn
248 .list_applied_migrations(config.migrate.table_name())
249 .await?;
250 validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
251
252 let latest_version = applied_migrations
253 .iter()
254 .max_by(|x, y| x.version.cmp(&y.version))
255 .map(|migration| migration.version)
256 .unwrap_or(0);
257 if let Some(target_version) = target_version {
258 if target_version < latest_version {
259 bail!(MigrateError::VersionTooOld(target_version, latest_version));
260 }
261 }
262
263 let applied_migrations: HashMap<_, _> = applied_migrations
264 .into_iter()
265 .map(|m| (m.version, m))
266 .collect();
267
268 for migration in migrator.iter() {
269 if migration.migration_type.is_down_migration() {
270 continue;
272 }
273
274 match applied_migrations.get(&migration.version) {
275 Some(applied_migration) => {
276 if migration.checksum != applied_migration.checksum {
277 bail!(MigrateError::VersionMismatch(migration.version));
278 }
279 }
280 None => {
281 let exceeds_target =
282 target_version.is_some_and(|target_version| migration.version > target_version);
283
284 let elapsed = if dry_run || exceeds_target {
285 Duration::new(0, 0)
286 } else if skip {
287 conn.skip(config.migrate.table_name(), migration).await?;
288 Duration::new(0, 0)
289 } else {
290 conn.apply(config.migrate.table_name(), migration).await?
291 };
292 let text = if exceeds_target {
293 "Skipped"
294 } else if dry_run {
295 "Can apply"
296 } else if skip {
297 "Skipped on request"
298 } else {
299 "Applied"
300 };
301
302 println!(
303 "{} {}/{} {} {}",
304 text,
305 style(migration.version).cyan(),
306 style(migration.migration_type.label()).green(),
307 migration.description,
308 style(format!("({elapsed:?})")).dim()
309 );
310 }
311 }
312 }
313
314 let _ = conn.close().await;
320
321 Ok(())
322}
323
324pub async fn revert(
325 config: &Config,
326 migration_source: &MigrationSourceOpt,
327 connect_opts: &ConnectOpts,
328 dry_run: bool,
329 ignore_missing: bool,
330 target_version: Option<i64>,
331) -> anyhow::Result<()> {
332 let migrator = migration_source.resolve(config).await?;
333
334 if let Some(target_version) = target_version {
335 if target_version != 0 && !migrator.version_exists(target_version) {
336 bail!(MigrateError::VersionNotPresent(target_version));
337 }
338 }
339
340 let mut conn = crate::connect(config, connect_opts).await?;
341
342 for schema_name in &config.migrate.create_schemas {
344 conn.create_schema_if_not_exists(schema_name).await?;
345 }
346
347 conn.ensure_migrations_table(config.migrate.table_name())
348 .await?;
349
350 let version = conn.dirty_version(config.migrate.table_name()).await?;
351 if let Some(version) = version {
352 bail!(MigrateError::Dirty(version));
353 }
354
355 let applied_migrations = conn
356 .list_applied_migrations(config.migrate.table_name())
357 .await?;
358 validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
359
360 let latest_version = applied_migrations
361 .iter()
362 .max_by(|x, y| x.version.cmp(&y.version))
363 .map(|migration| migration.version)
364 .unwrap_or(0);
365 if let Some(target_version) = target_version {
366 if target_version > latest_version {
367 bail!(MigrateError::VersionTooNew(target_version, latest_version));
368 }
369 }
370
371 let applied_migrations: HashMap<_, _> = applied_migrations
372 .into_iter()
373 .map(|m| (m.version, m))
374 .collect();
375
376 let mut is_applied = false;
377 for migration in migrator.iter().rev() {
378 if !migration.migration_type.is_down_migration() {
379 continue;
382 }
383
384 if applied_migrations.contains_key(&migration.version) {
385 let skip =
386 target_version.is_some_and(|target_version| migration.version <= target_version);
387
388 let elapsed = if dry_run || skip {
389 Duration::new(0, 0)
390 } else {
391 conn.revert(config.migrate.table_name(), migration).await?
392 };
393 let text = if skip {
394 "Skipped"
395 } else if dry_run {
396 "Can apply"
397 } else {
398 "Applied"
399 };
400
401 println!(
402 "{} {}/{} {} {}",
403 text,
404 style(migration.version).cyan(),
405 style(migration.migration_type.label()).green(),
406 migration.description,
407 style(format!("({elapsed:?})")).dim()
408 );
409
410 is_applied = true;
411
412 if target_version.is_none() {
415 break;
416 }
417 }
418 }
419 if !is_applied {
420 println!("No migrations available to revert");
421 }
422
423 let _ = conn.close().await;
424
425 Ok(())
426}
427
428pub fn build_script(
429 config: &Config,
430 migration_source: &MigrationSourceOpt,
431 force: bool,
432) -> anyhow::Result<()> {
433 let source = migration_source.resolve_path(config);
434
435 anyhow::ensure!(
436 Path::new("Cargo.toml").exists(),
437 "must be run in a Cargo project root"
438 );
439
440 anyhow::ensure!(
441 (force || !Path::new("build.rs").exists()),
442 "build.rs already exists; use --force to overwrite"
443 );
444
445 let contents = format!(
446 r#"// generated by `sqlx migrate build-script`
447fn main() {{
448 // trigger recompilation when a new migration is added
449 println!("cargo:rerun-if-changed={source}");
450}}
451"#,
452 );
453
454 fs::write("build.rs", contents)?;
455
456 println!("Created `build.rs`; be sure to check it into version control!");
457
458 Ok(())
459}