Skip to main content

sqlx_cli/
migrate.rs

1use crate::config::Config;
2use crate::opt::{AddMigrationOpts, ConnectOpts, MigrationSourceOpt};
3use anyhow::{bail, Context};
4use console::style;
5use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator};
6use sqlx::Connection;
7use std::borrow::Cow;
8use std::collections::{HashMap, HashSet};
9use std::fmt::Write;
10use std::fs::{self, File};
11use std::path::Path;
12use std::time::Duration;
13
14pub async fn add(opts: AddMigrationOpts) -> anyhow::Result<()> {
15    let config = opts.config.load_config().await?;
16
17    let source = opts.source.resolve_path(&config);
18
19    fs::create_dir_all(source).context("Unable to create migrations directory")?;
20
21    let migrator = opts.source.resolve(&config).await?;
22
23    let version_prefix = opts.version_prefix(&config, &migrator);
24
25    if opts.reversible(&config, &migrator) {
26        create_file(
27            source,
28            &version_prefix,
29            &opts.description,
30            MigrationType::ReversibleUp,
31        )?;
32        create_file(
33            source,
34            &version_prefix,
35            &opts.description,
36            MigrationType::ReversibleDown,
37        )?;
38    } else {
39        create_file(
40            source,
41            &version_prefix,
42            &opts.description,
43            MigrationType::Simple,
44        )?;
45    }
46
47    // if the migrations directory is empty
48    let has_existing_migrations = fs::read_dir(source)
49        .map(|mut dir| dir.next().is_some())
50        .unwrap_or(false);
51
52    if !has_existing_migrations {
53        let quoted_source = if opts.source.source.is_some() {
54            format!("{source:?}")
55        } else {
56            "".to_string()
57        };
58
59        // Provide a link to the current version in case the details change.
60        // Patch version is deliberately omitted.
61        let version = if let (Some(major), Some(minor)) = (
62            // Don't fail if we're not being built by Cargo
63            option_env!("CARGO_PKG_VERSION_MAJOR"),
64            option_env!("CARGO_PKG_VERSION_MINOR"),
65        ) {
66            format!("{major}.{minor}")
67        } else {
68            // If a version isn't available, "latest" is fine.
69            "latest".to_string()
70        };
71
72        print!(
73            r#"
74Congratulations on creating your first migration!
75
76Did you know you can embed your migrations in your application binary?
77On startup, after creating your database connection or pool, add:
78
79sqlx::migrate!({quoted_source}).run(<&your_pool OR &mut your_connection>).await?;
80
81Note that the compiler won't pick up new migrations if no Rust source files have changed.
82You can create a Cargo build script to work around this with `sqlx migrate build-script`.
83
84See: https://docs.rs/sqlx/{version}/sqlx/macro.migrate.html
85"#,
86        );
87    }
88
89    Ok(())
90}
91
92fn create_file(
93    migration_source: &str,
94    file_prefix: &str,
95    description: &str,
96    migration_type: MigrationType,
97) -> anyhow::Result<()> {
98    use std::path::PathBuf;
99
100    let mut file_name = file_prefix.to_string();
101    file_name.push('_');
102    file_name.push_str(&description.replace(' ', "_"));
103    file_name.push_str(migration_type.suffix());
104
105    let mut path = PathBuf::new();
106    path.push(migration_source);
107    path.push(&file_name);
108
109    println!("Creating {}", style(path.display()).cyan());
110
111    let mut file = File::create(&path).context("Failed to create migration file")?;
112
113    std::io::Write::write_all(&mut file, migration_type.file_content().as_bytes())?;
114
115    Ok(())
116}
117
118fn short_checksum(checksum: &[u8]) -> String {
119    let mut s = String::with_capacity(checksum.len() * 2);
120    for b in checksum {
121        write!(&mut s, "{b:02x?}").expect("should not fail to write to str");
122    }
123    s
124}
125
126pub async fn info(
127    config: &Config,
128    migration_source: &MigrationSourceOpt,
129    connect_opts: &ConnectOpts,
130) -> anyhow::Result<()> {
131    let migrator = migration_source.resolve(config).await?;
132
133    let mut conn = crate::connect(config, connect_opts).await?;
134
135    // FIXME: we shouldn't actually be creating anything here
136    for schema_name in &config.migrate.create_schemas {
137        conn.create_schema_if_not_exists(schema_name).await?;
138    }
139
140    conn.ensure_migrations_table(config.migrate.table_name())
141        .await?;
142
143    let applied_migrations: HashMap<_, _> = conn
144        .list_applied_migrations(config.migrate.table_name())
145        .await?
146        .into_iter()
147        .map(|m| (m.version, m))
148        .collect();
149
150    for migration in migrator.iter() {
151        if migration.migration_type.is_down_migration() {
152            // Skipping down migrations
153            continue;
154        }
155
156        let applied = applied_migrations.get(&migration.version);
157
158        let (status_msg, mismatched_checksum) = if let Some(applied) = applied {
159            if applied.checksum != migration.checksum {
160                (style("installed (different checksum)").red(), true)
161            } else {
162                (style("installed").green(), false)
163            }
164        } else {
165            (style("pending").yellow(), false)
166        };
167
168        println!(
169            "{}/{} {}",
170            style(migration.version).cyan(),
171            status_msg,
172            migration.description
173        );
174
175        if mismatched_checksum {
176            println!(
177                "applied migration had checksum {}",
178                short_checksum(
179                    &applied
180                        .map(|a| a.checksum.clone())
181                        .unwrap_or_else(|| Cow::Owned(vec![]))
182                ),
183            );
184            println!(
185                "local migration has checksum {}",
186                short_checksum(&migration.checksum)
187            )
188        }
189    }
190
191    let _ = conn.close().await;
192
193    Ok(())
194}
195
196fn validate_applied_migrations(
197    applied_migrations: &[AppliedMigration],
198    migrator: &Migrator,
199    ignore_missing: bool,
200) -> Result<(), MigrateError> {
201    if ignore_missing {
202        return Ok(());
203    }
204
205    let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
206
207    for applied_migration in applied_migrations {
208        if !migrations.contains(&applied_migration.version) {
209            return Err(MigrateError::VersionMissing(applied_migration.version));
210        }
211    }
212
213    Ok(())
214}
215
216pub async fn run(
217    config: &Config,
218    migration_source: &MigrationSourceOpt,
219    connect_opts: &ConnectOpts,
220    dry_run: bool,
221    ignore_missing: bool,
222    target_version: Option<i64>,
223    skip: bool,
224) -> anyhow::Result<()> {
225    let migrator = migration_source.resolve(config).await?;
226
227    if let Some(target_version) = target_version {
228        if !migrator.version_exists(target_version) {
229            bail!(MigrateError::VersionNotPresent(target_version));
230        }
231    }
232
233    let mut conn = crate::connect(config, connect_opts).await?;
234
235    for schema_name in &config.migrate.create_schemas {
236        conn.create_schema_if_not_exists(schema_name).await?;
237    }
238
239    conn.ensure_migrations_table(config.migrate.table_name())
240        .await?;
241
242    let version = conn.dirty_version(config.migrate.table_name()).await?;
243    if let Some(version) = version {
244        bail!(MigrateError::Dirty(version));
245    }
246
247    let applied_migrations = conn
248        .list_applied_migrations(config.migrate.table_name())
249        .await?;
250    validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
251
252    let latest_version = applied_migrations
253        .iter()
254        .max_by(|x, y| x.version.cmp(&y.version))
255        .map(|migration| migration.version)
256        .unwrap_or(0);
257    if let Some(target_version) = target_version {
258        if target_version < latest_version {
259            bail!(MigrateError::VersionTooOld(target_version, latest_version));
260        }
261    }
262
263    let applied_migrations: HashMap<_, _> = applied_migrations
264        .into_iter()
265        .map(|m| (m.version, m))
266        .collect();
267
268    for migration in migrator.iter() {
269        if migration.migration_type.is_down_migration() {
270            // Skipping down migrations
271            continue;
272        }
273
274        match applied_migrations.get(&migration.version) {
275            Some(applied_migration) => {
276                if migration.checksum != applied_migration.checksum {
277                    bail!(MigrateError::VersionMismatch(migration.version));
278                }
279            }
280            None => {
281                let exceeds_target =
282                    target_version.is_some_and(|target_version| migration.version > target_version);
283
284                let elapsed = if dry_run || exceeds_target {
285                    Duration::new(0, 0)
286                } else if skip {
287                    conn.skip(config.migrate.table_name(), migration).await?;
288                    Duration::new(0, 0)
289                } else {
290                    conn.apply(config.migrate.table_name(), migration).await?
291                };
292                let text = if exceeds_target {
293                    "Skipped"
294                } else if dry_run {
295                    "Can apply"
296                } else if skip {
297                    "Skipped on request"
298                } else {
299                    "Applied"
300                };
301
302                println!(
303                    "{} {}/{} {} {}",
304                    text,
305                    style(migration.version).cyan(),
306                    style(migration.migration_type.label()).green(),
307                    migration.description,
308                    style(format!("({elapsed:?})")).dim()
309                );
310            }
311        }
312    }
313
314    // Close the connection before exiting:
315    // * For MySQL and Postgres this should ensure timely cleanup on the server side,
316    //   including decrementing the open connection count.
317    // * For SQLite this should checkpoint and delete the WAL file to ensure the migrations
318    //   were actually applied to the database file and aren't just sitting in the WAL file.
319    let _ = conn.close().await;
320
321    Ok(())
322}
323
324pub async fn revert(
325    config: &Config,
326    migration_source: &MigrationSourceOpt,
327    connect_opts: &ConnectOpts,
328    dry_run: bool,
329    ignore_missing: bool,
330    target_version: Option<i64>,
331) -> anyhow::Result<()> {
332    let migrator = migration_source.resolve(config).await?;
333
334    if let Some(target_version) = target_version {
335        if target_version != 0 && !migrator.version_exists(target_version) {
336            bail!(MigrateError::VersionNotPresent(target_version));
337        }
338    }
339
340    let mut conn = crate::connect(config, connect_opts).await?;
341
342    // FIXME: we should not be creating anything here if it doesn't exist
343    for schema_name in &config.migrate.create_schemas {
344        conn.create_schema_if_not_exists(schema_name).await?;
345    }
346
347    conn.ensure_migrations_table(config.migrate.table_name())
348        .await?;
349
350    let version = conn.dirty_version(config.migrate.table_name()).await?;
351    if let Some(version) = version {
352        bail!(MigrateError::Dirty(version));
353    }
354
355    let applied_migrations = conn
356        .list_applied_migrations(config.migrate.table_name())
357        .await?;
358    validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
359
360    let latest_version = applied_migrations
361        .iter()
362        .max_by(|x, y| x.version.cmp(&y.version))
363        .map(|migration| migration.version)
364        .unwrap_or(0);
365    if let Some(target_version) = target_version {
366        if target_version > latest_version {
367            bail!(MigrateError::VersionTooNew(target_version, latest_version));
368        }
369    }
370
371    let applied_migrations: HashMap<_, _> = applied_migrations
372        .into_iter()
373        .map(|m| (m.version, m))
374        .collect();
375
376    let mut is_applied = false;
377    for migration in migrator.iter().rev() {
378        if !migration.migration_type.is_down_migration() {
379            // Skipping non down migration
380            // This will skip any simple or up migration file
381            continue;
382        }
383
384        if applied_migrations.contains_key(&migration.version) {
385            let skip =
386                target_version.is_some_and(|target_version| migration.version <= target_version);
387
388            let elapsed = if dry_run || skip {
389                Duration::new(0, 0)
390            } else {
391                conn.revert(config.migrate.table_name(), migration).await?
392            };
393            let text = if skip {
394                "Skipped"
395            } else if dry_run {
396                "Can apply"
397            } else {
398                "Applied"
399            };
400
401            println!(
402                "{} {}/{} {} {}",
403                text,
404                style(migration.version).cyan(),
405                style(migration.migration_type.label()).green(),
406                migration.description,
407                style(format!("({elapsed:?})")).dim()
408            );
409
410            is_applied = true;
411
412            // Only a single migration will be reverted at a time if no target
413            // version is supplied, so we break.
414            if target_version.is_none() {
415                break;
416            }
417        }
418    }
419    if !is_applied {
420        println!("No migrations available to revert");
421    }
422
423    let _ = conn.close().await;
424
425    Ok(())
426}
427
428pub fn build_script(
429    config: &Config,
430    migration_source: &MigrationSourceOpt,
431    force: bool,
432) -> anyhow::Result<()> {
433    let source = migration_source.resolve_path(config);
434
435    anyhow::ensure!(
436        Path::new("Cargo.toml").exists(),
437        "must be run in a Cargo project root"
438    );
439
440    anyhow::ensure!(
441        (force || !Path::new("build.rs").exists()),
442        "build.rs already exists; use --force to overwrite"
443    );
444
445    let contents = format!(
446        r#"// generated by `sqlx migrate build-script`
447fn main() {{
448    // trigger recompilation when a new migration is added
449    println!("cargo:rerun-if-changed={source}");
450}}
451"#,
452    );
453
454    fs::write("build.rs", contents)?;
455
456    println!("Created `build.rs`; be sure to check it into version control!");
457
458    Ok(())
459}