Skip to main content

sqlx_cli/
migrate.rs

1use crate::config::Config;
2use crate::opt::{AddMigrationOpts, ConnectOpts, MigrationSourceOpt};
3use anyhow::{bail, Context};
4use console::style;
5use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator};
6use sqlx::Connection;
7use std::borrow::Cow;
8use std::collections::{HashMap, HashSet};
9use std::fmt::Write;
10use std::fs::{self, File};
11use std::path::Path;
12use std::time::Duration;
13
14pub async fn add(opts: AddMigrationOpts) -> anyhow::Result<()> {
15    let config = opts.config.load_config().await?;
16
17    let source = opts.source.resolve_path(&config);
18
19    fs::create_dir_all(source).context("Unable to create migrations directory")?;
20
21    let migrator = opts.source.resolve(&config).await?;
22
23    let version_prefix = opts.version_prefix(&config, &migrator);
24
25    if opts.reversible(&config, &migrator) {
26        create_file(
27            source,
28            &version_prefix,
29            &opts.description,
30            MigrationType::ReversibleUp,
31        )?;
32        create_file(
33            source,
34            &version_prefix,
35            &opts.description,
36            MigrationType::ReversibleDown,
37        )?;
38    } else {
39        create_file(
40            source,
41            &version_prefix,
42            &opts.description,
43            MigrationType::Simple,
44        )?;
45    }
46
47    // if the migrations directory is empty
48    let has_existing_migrations = fs::read_dir(source)
49        .map(|mut dir| dir.next().is_some())
50        .unwrap_or(false);
51
52    if !has_existing_migrations {
53        let quoted_source = if opts.source.source.is_some() {
54            format!("{source:?}")
55        } else {
56            "".to_string()
57        };
58
59        // Provide a link to the current version in case the details change.
60        // Patch version is deliberately omitted.
61        let version = if let (Some(major), Some(minor)) = (
62            // Don't fail if we're not being built by Cargo
63            option_env!("CARGO_PKG_VERSION_MAJOR"),
64            option_env!("CARGO_PKG_VERSION_MINOR"),
65        ) {
66            format!("{major}.{minor}")
67        } else {
68            // If a version isn't available, "latest" is fine.
69            "latest".to_string()
70        };
71
72        print!(
73            r#"
74Congratulations on creating your first migration!
75
76Did you know you can embed your migrations in your application binary?
77On startup, after creating your database connection or pool, add:
78
79sqlx::migrate!({quoted_source}).run(<&your_pool OR &mut your_connection>).await?;
80
81Note that the compiler won't pick up new migrations if no Rust source files have changed.
82You can create a Cargo build script to work around this with `sqlx migrate build-script`.
83
84See: https://docs.rs/sqlx/{version}/sqlx/macro.migrate.html
85"#,
86        );
87    }
88
89    Ok(())
90}
91
92fn create_file(
93    migration_source: &str,
94    file_prefix: &str,
95    description: &str,
96    migration_type: MigrationType,
97) -> anyhow::Result<()> {
98    use std::path::PathBuf;
99
100    let mut file_name = file_prefix.to_string();
101    file_name.push('_');
102    file_name.push_str(&description.replace(' ', "_"));
103    file_name.push_str(migration_type.suffix());
104
105    let mut path = PathBuf::new();
106    path.push(migration_source);
107    path.push(&file_name);
108
109    println!("Creating {}", style(path.display()).cyan());
110
111    let mut file = File::create(&path).context("Failed to create migration file")?;
112
113    std::io::Write::write_all(&mut file, migration_type.file_content().as_bytes())?;
114
115    Ok(())
116}
117
118fn short_checksum(checksum: &[u8]) -> String {
119    let mut s = String::with_capacity(checksum.len() * 2);
120    for b in checksum {
121        write!(&mut s, "{b:02x?}").expect("should not fail to write to str");
122    }
123    s
124}
125
126pub async fn info(
127    config: &Config,
128    migration_source: &MigrationSourceOpt,
129    connect_opts: &ConnectOpts,
130) -> anyhow::Result<()> {
131    let migrator = migration_source.resolve(config).await?;
132
133    let mut conn = crate::connect(config, connect_opts).await?;
134
135    // FIXME: we shouldn't actually be creating anything here
136    for schema_name in &config.migrate.create_schemas {
137        conn.create_schema_if_not_exists(schema_name).await?;
138    }
139
140    conn.ensure_migrations_table(config.migrate.table_name())
141        .await?;
142
143    let applied_migrations: HashMap<_, _> = conn
144        .list_applied_migrations(config.migrate.table_name())
145        .await?
146        .into_iter()
147        .map(|m| (m.version, m))
148        .collect();
149
150    for migration in migrator.iter() {
151        if migration.migration_type.is_down_migration() {
152            // Skipping down migrations
153            continue;
154        }
155
156        let applied = applied_migrations.get(&migration.version);
157
158        let (status_msg, mismatched_checksum) = if let Some(applied) = applied {
159            if applied.checksum != migration.checksum {
160                (style("installed (different checksum)").red(), true)
161            } else {
162                (style("installed").green(), false)
163            }
164        } else {
165            (style("pending").yellow(), false)
166        };
167
168        println!(
169            "{}/{} {}",
170            style(migration.version).cyan(),
171            status_msg,
172            migration.description
173        );
174
175        if mismatched_checksum {
176            println!(
177                "applied migration had checksum {}",
178                short_checksum(
179                    &applied
180                        .map(|a| a.checksum.clone())
181                        .unwrap_or_else(|| Cow::Owned(vec![]))
182                ),
183            );
184            println!(
185                "local migration has checksum {}",
186                short_checksum(&migration.checksum)
187            )
188        }
189    }
190
191    let _ = conn.close().await;
192
193    Ok(())
194}
195
196fn validate_applied_migrations(
197    applied_migrations: &[AppliedMigration],
198    migrator: &Migrator,
199    ignore_missing: bool,
200) -> Result<(), MigrateError> {
201    if ignore_missing {
202        return Ok(());
203    }
204
205    let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
206
207    for applied_migration in applied_migrations {
208        if !migrations.contains(&applied_migration.version) {
209            return Err(MigrateError::VersionMissing(applied_migration.version));
210        }
211    }
212
213    Ok(())
214}
215
216pub async fn run(
217    config: &Config,
218    migration_source: &MigrationSourceOpt,
219    connect_opts: &ConnectOpts,
220    dry_run: bool,
221    ignore_missing: bool,
222    target_version: Option<i64>,
223) -> anyhow::Result<()> {
224    let migrator = migration_source.resolve(config).await?;
225
226    if let Some(target_version) = target_version {
227        if !migrator.version_exists(target_version) {
228            bail!(MigrateError::VersionNotPresent(target_version));
229        }
230    }
231
232    let mut conn = crate::connect(config, connect_opts).await?;
233
234    for schema_name in &config.migrate.create_schemas {
235        conn.create_schema_if_not_exists(schema_name).await?;
236    }
237
238    conn.ensure_migrations_table(config.migrate.table_name())
239        .await?;
240
241    let version = conn.dirty_version(config.migrate.table_name()).await?;
242    if let Some(version) = version {
243        bail!(MigrateError::Dirty(version));
244    }
245
246    let applied_migrations = conn
247        .list_applied_migrations(config.migrate.table_name())
248        .await?;
249    validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
250
251    let latest_version = applied_migrations
252        .iter()
253        .max_by(|x, y| x.version.cmp(&y.version))
254        .map(|migration| migration.version)
255        .unwrap_or(0);
256    if let Some(target_version) = target_version {
257        if target_version < latest_version {
258            bail!(MigrateError::VersionTooOld(target_version, latest_version));
259        }
260    }
261
262    let applied_migrations: HashMap<_, _> = applied_migrations
263        .into_iter()
264        .map(|m| (m.version, m))
265        .collect();
266
267    for migration in migrator.iter() {
268        if migration.migration_type.is_down_migration() {
269            // Skipping down migrations
270            continue;
271        }
272
273        match applied_migrations.get(&migration.version) {
274            Some(applied_migration) => {
275                if migration.checksum != applied_migration.checksum {
276                    bail!(MigrateError::VersionMismatch(migration.version));
277                }
278            }
279            None => {
280                let skip =
281                    target_version.is_some_and(|target_version| migration.version > target_version);
282
283                let elapsed = if dry_run || skip {
284                    Duration::new(0, 0)
285                } else {
286                    conn.apply(config.migrate.table_name(), migration).await?
287                };
288                let text = if skip {
289                    "Skipped"
290                } else if dry_run {
291                    "Can apply"
292                } else {
293                    "Applied"
294                };
295
296                println!(
297                    "{} {}/{} {} {}",
298                    text,
299                    style(migration.version).cyan(),
300                    style(migration.migration_type.label()).green(),
301                    migration.description,
302                    style(format!("({elapsed:?})")).dim()
303                );
304            }
305        }
306    }
307
308    // Close the connection before exiting:
309    // * For MySQL and Postgres this should ensure timely cleanup on the server side,
310    //   including decrementing the open connection count.
311    // * For SQLite this should checkpoint and delete the WAL file to ensure the migrations
312    //   were actually applied to the database file and aren't just sitting in the WAL file.
313    let _ = conn.close().await;
314
315    Ok(())
316}
317
318pub async fn revert(
319    config: &Config,
320    migration_source: &MigrationSourceOpt,
321    connect_opts: &ConnectOpts,
322    dry_run: bool,
323    ignore_missing: bool,
324    target_version: Option<i64>,
325) -> anyhow::Result<()> {
326    let migrator = migration_source.resolve(config).await?;
327
328    if let Some(target_version) = target_version {
329        if target_version != 0 && !migrator.version_exists(target_version) {
330            bail!(MigrateError::VersionNotPresent(target_version));
331        }
332    }
333
334    let mut conn = crate::connect(config, connect_opts).await?;
335
336    // FIXME: we should not be creating anything here if it doesn't exist
337    for schema_name in &config.migrate.create_schemas {
338        conn.create_schema_if_not_exists(schema_name).await?;
339    }
340
341    conn.ensure_migrations_table(config.migrate.table_name())
342        .await?;
343
344    let version = conn.dirty_version(config.migrate.table_name()).await?;
345    if let Some(version) = version {
346        bail!(MigrateError::Dirty(version));
347    }
348
349    let applied_migrations = conn
350        .list_applied_migrations(config.migrate.table_name())
351        .await?;
352    validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
353
354    let latest_version = applied_migrations
355        .iter()
356        .max_by(|x, y| x.version.cmp(&y.version))
357        .map(|migration| migration.version)
358        .unwrap_or(0);
359    if let Some(target_version) = target_version {
360        if target_version > latest_version {
361            bail!(MigrateError::VersionTooNew(target_version, latest_version));
362        }
363    }
364
365    let applied_migrations: HashMap<_, _> = applied_migrations
366        .into_iter()
367        .map(|m| (m.version, m))
368        .collect();
369
370    let mut is_applied = false;
371    for migration in migrator.iter().rev() {
372        if !migration.migration_type.is_down_migration() {
373            // Skipping non down migration
374            // This will skip any simple or up migration file
375            continue;
376        }
377
378        if applied_migrations.contains_key(&migration.version) {
379            let skip =
380                target_version.is_some_and(|target_version| migration.version <= target_version);
381
382            let elapsed = if dry_run || skip {
383                Duration::new(0, 0)
384            } else {
385                conn.revert(config.migrate.table_name(), migration).await?
386            };
387            let text = if skip {
388                "Skipped"
389            } else if dry_run {
390                "Can apply"
391            } else {
392                "Applied"
393            };
394
395            println!(
396                "{} {}/{} {} {}",
397                text,
398                style(migration.version).cyan(),
399                style(migration.migration_type.label()).green(),
400                migration.description,
401                style(format!("({elapsed:?})")).dim()
402            );
403
404            is_applied = true;
405
406            // Only a single migration will be reverted at a time if no target
407            // version is supplied, so we break.
408            if target_version.is_none() {
409                break;
410            }
411        }
412    }
413    if !is_applied {
414        println!("No migrations available to revert");
415    }
416
417    let _ = conn.close().await;
418
419    Ok(())
420}
421
422pub fn build_script(
423    config: &Config,
424    migration_source: &MigrationSourceOpt,
425    force: bool,
426) -> anyhow::Result<()> {
427    let source = migration_source.resolve_path(config);
428
429    anyhow::ensure!(
430        Path::new("Cargo.toml").exists(),
431        "must be run in a Cargo project root"
432    );
433
434    anyhow::ensure!(
435        (force || !Path::new("build.rs").exists()),
436        "build.rs already exists; use --force to overwrite"
437    );
438
439    let contents = format!(
440        r#"// generated by `sqlx migrate build-script`
441fn main() {{
442    // trigger recompilation when a new migration is added
443    println!("cargo:rerun-if-changed={source}");
444}}
445"#,
446    );
447
448    fs::write("build.rs", contents)?;
449
450    println!("Created `build.rs`; be sure to check it into version control!");
451
452    Ok(())
453}