1use crate::config::Config;
2use crate::opt::{AddMigrationOpts, ConnectOpts, MigrationSourceOpt};
3use anyhow::{bail, Context};
4use console::style;
5use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator};
6use sqlx::Connection;
7use std::borrow::Cow;
8use std::collections::{HashMap, HashSet};
9use std::fmt::Write;
10use std::fs::{self, File};
11use std::path::Path;
12use std::time::Duration;
13
14pub async fn add(opts: AddMigrationOpts) -> anyhow::Result<()> {
15 let config = opts.config.load_config().await?;
16
17 let source = opts.source.resolve_path(&config);
18
19 fs::create_dir_all(source).context("Unable to create migrations directory")?;
20
21 let migrator = opts.source.resolve(&config).await?;
22
23 let version_prefix = opts.version_prefix(&config, &migrator);
24
25 if opts.reversible(&config, &migrator) {
26 create_file(
27 source,
28 &version_prefix,
29 &opts.description,
30 MigrationType::ReversibleUp,
31 )?;
32 create_file(
33 source,
34 &version_prefix,
35 &opts.description,
36 MigrationType::ReversibleDown,
37 )?;
38 } else {
39 create_file(
40 source,
41 &version_prefix,
42 &opts.description,
43 MigrationType::Simple,
44 )?;
45 }
46
47 let has_existing_migrations = fs::read_dir(source)
49 .map(|mut dir| dir.next().is_some())
50 .unwrap_or(false);
51
52 if !has_existing_migrations {
53 let quoted_source = if opts.source.source.is_some() {
54 format!("{source:?}")
55 } else {
56 "".to_string()
57 };
58
59 let version = if let (Some(major), Some(minor)) = (
62 option_env!("CARGO_PKG_VERSION_MAJOR"),
64 option_env!("CARGO_PKG_VERSION_MINOR"),
65 ) {
66 format!("{major}.{minor}")
67 } else {
68 "latest".to_string()
70 };
71
72 print!(
73 r#"
74Congratulations on creating your first migration!
75
76Did you know you can embed your migrations in your application binary?
77On startup, after creating your database connection or pool, add:
78
79sqlx::migrate!({quoted_source}).run(<&your_pool OR &mut your_connection>).await?;
80
81Note that the compiler won't pick up new migrations if no Rust source files have changed.
82You can create a Cargo build script to work around this with `sqlx migrate build-script`.
83
84See: https://docs.rs/sqlx/{version}/sqlx/macro.migrate.html
85"#,
86 );
87 }
88
89 Ok(())
90}
91
92fn create_file(
93 migration_source: &str,
94 file_prefix: &str,
95 description: &str,
96 migration_type: MigrationType,
97) -> anyhow::Result<()> {
98 use std::path::PathBuf;
99
100 let mut file_name = file_prefix.to_string();
101 file_name.push('_');
102 file_name.push_str(&description.replace(' ', "_"));
103 file_name.push_str(migration_type.suffix());
104
105 let mut path = PathBuf::new();
106 path.push(migration_source);
107 path.push(&file_name);
108
109 println!("Creating {}", style(path.display()).cyan());
110
111 let mut file = File::create(&path).context("Failed to create migration file")?;
112
113 std::io::Write::write_all(&mut file, migration_type.file_content().as_bytes())?;
114
115 Ok(())
116}
117
118fn short_checksum(checksum: &[u8]) -> String {
119 let mut s = String::with_capacity(checksum.len() * 2);
120 for b in checksum {
121 write!(&mut s, "{b:02x?}").expect("should not fail to write to str");
122 }
123 s
124}
125
126pub async fn info(
127 config: &Config,
128 migration_source: &MigrationSourceOpt,
129 connect_opts: &ConnectOpts,
130) -> anyhow::Result<()> {
131 let migrator = migration_source.resolve(config).await?;
132
133 let mut conn = crate::connect(config, connect_opts).await?;
134
135 for schema_name in &config.migrate.create_schemas {
137 conn.create_schema_if_not_exists(schema_name).await?;
138 }
139
140 conn.ensure_migrations_table(config.migrate.table_name())
141 .await?;
142
143 let applied_migrations: HashMap<_, _> = conn
144 .list_applied_migrations(config.migrate.table_name())
145 .await?
146 .into_iter()
147 .map(|m| (m.version, m))
148 .collect();
149
150 for migration in migrator.iter() {
151 if migration.migration_type.is_down_migration() {
152 continue;
154 }
155
156 let applied = applied_migrations.get(&migration.version);
157
158 let (status_msg, mismatched_checksum) = if let Some(applied) = applied {
159 if applied.checksum != migration.checksum {
160 (style("installed (different checksum)").red(), true)
161 } else {
162 (style("installed").green(), false)
163 }
164 } else {
165 (style("pending").yellow(), false)
166 };
167
168 println!(
169 "{}/{} {}",
170 style(migration.version).cyan(),
171 status_msg,
172 migration.description
173 );
174
175 if mismatched_checksum {
176 println!(
177 "applied migration had checksum {}",
178 short_checksum(
179 &applied
180 .map(|a| a.checksum.clone())
181 .unwrap_or_else(|| Cow::Owned(vec![]))
182 ),
183 );
184 println!(
185 "local migration has checksum {}",
186 short_checksum(&migration.checksum)
187 )
188 }
189 }
190
191 let _ = conn.close().await;
192
193 Ok(())
194}
195
196fn validate_applied_migrations(
197 applied_migrations: &[AppliedMigration],
198 migrator: &Migrator,
199 ignore_missing: bool,
200) -> Result<(), MigrateError> {
201 if ignore_missing {
202 return Ok(());
203 }
204
205 let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
206
207 for applied_migration in applied_migrations {
208 if !migrations.contains(&applied_migration.version) {
209 return Err(MigrateError::VersionMissing(applied_migration.version));
210 }
211 }
212
213 Ok(())
214}
215
216pub async fn run(
217 config: &Config,
218 migration_source: &MigrationSourceOpt,
219 connect_opts: &ConnectOpts,
220 dry_run: bool,
221 ignore_missing: bool,
222 target_version: Option<i64>,
223) -> anyhow::Result<()> {
224 let migrator = migration_source.resolve(config).await?;
225
226 if let Some(target_version) = target_version {
227 if !migrator.version_exists(target_version) {
228 bail!(MigrateError::VersionNotPresent(target_version));
229 }
230 }
231
232 let mut conn = crate::connect(config, connect_opts).await?;
233
234 for schema_name in &config.migrate.create_schemas {
235 conn.create_schema_if_not_exists(schema_name).await?;
236 }
237
238 conn.ensure_migrations_table(config.migrate.table_name())
239 .await?;
240
241 let version = conn.dirty_version(config.migrate.table_name()).await?;
242 if let Some(version) = version {
243 bail!(MigrateError::Dirty(version));
244 }
245
246 let applied_migrations = conn
247 .list_applied_migrations(config.migrate.table_name())
248 .await?;
249 validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
250
251 let latest_version = applied_migrations
252 .iter()
253 .max_by(|x, y| x.version.cmp(&y.version))
254 .map(|migration| migration.version)
255 .unwrap_or(0);
256 if let Some(target_version) = target_version {
257 if target_version < latest_version {
258 bail!(MigrateError::VersionTooOld(target_version, latest_version));
259 }
260 }
261
262 let applied_migrations: HashMap<_, _> = applied_migrations
263 .into_iter()
264 .map(|m| (m.version, m))
265 .collect();
266
267 for migration in migrator.iter() {
268 if migration.migration_type.is_down_migration() {
269 continue;
271 }
272
273 match applied_migrations.get(&migration.version) {
274 Some(applied_migration) => {
275 if migration.checksum != applied_migration.checksum {
276 bail!(MigrateError::VersionMismatch(migration.version));
277 }
278 }
279 None => {
280 let skip =
281 target_version.is_some_and(|target_version| migration.version > target_version);
282
283 let elapsed = if dry_run || skip {
284 Duration::new(0, 0)
285 } else {
286 conn.apply(config.migrate.table_name(), migration).await?
287 };
288 let text = if skip {
289 "Skipped"
290 } else if dry_run {
291 "Can apply"
292 } else {
293 "Applied"
294 };
295
296 println!(
297 "{} {}/{} {} {}",
298 text,
299 style(migration.version).cyan(),
300 style(migration.migration_type.label()).green(),
301 migration.description,
302 style(format!("({elapsed:?})")).dim()
303 );
304 }
305 }
306 }
307
308 let _ = conn.close().await;
314
315 Ok(())
316}
317
318pub async fn revert(
319 config: &Config,
320 migration_source: &MigrationSourceOpt,
321 connect_opts: &ConnectOpts,
322 dry_run: bool,
323 ignore_missing: bool,
324 target_version: Option<i64>,
325) -> anyhow::Result<()> {
326 let migrator = migration_source.resolve(config).await?;
327
328 if let Some(target_version) = target_version {
329 if target_version != 0 && !migrator.version_exists(target_version) {
330 bail!(MigrateError::VersionNotPresent(target_version));
331 }
332 }
333
334 let mut conn = crate::connect(config, connect_opts).await?;
335
336 for schema_name in &config.migrate.create_schemas {
338 conn.create_schema_if_not_exists(schema_name).await?;
339 }
340
341 conn.ensure_migrations_table(config.migrate.table_name())
342 .await?;
343
344 let version = conn.dirty_version(config.migrate.table_name()).await?;
345 if let Some(version) = version {
346 bail!(MigrateError::Dirty(version));
347 }
348
349 let applied_migrations = conn
350 .list_applied_migrations(config.migrate.table_name())
351 .await?;
352 validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
353
354 let latest_version = applied_migrations
355 .iter()
356 .max_by(|x, y| x.version.cmp(&y.version))
357 .map(|migration| migration.version)
358 .unwrap_or(0);
359 if let Some(target_version) = target_version {
360 if target_version > latest_version {
361 bail!(MigrateError::VersionTooNew(target_version, latest_version));
362 }
363 }
364
365 let applied_migrations: HashMap<_, _> = applied_migrations
366 .into_iter()
367 .map(|m| (m.version, m))
368 .collect();
369
370 let mut is_applied = false;
371 for migration in migrator.iter().rev() {
372 if !migration.migration_type.is_down_migration() {
373 continue;
376 }
377
378 if applied_migrations.contains_key(&migration.version) {
379 let skip =
380 target_version.is_some_and(|target_version| migration.version <= target_version);
381
382 let elapsed = if dry_run || skip {
383 Duration::new(0, 0)
384 } else {
385 conn.revert(config.migrate.table_name(), migration).await?
386 };
387 let text = if skip {
388 "Skipped"
389 } else if dry_run {
390 "Can apply"
391 } else {
392 "Applied"
393 };
394
395 println!(
396 "{} {}/{} {} {}",
397 text,
398 style(migration.version).cyan(),
399 style(migration.migration_type.label()).green(),
400 migration.description,
401 style(format!("({elapsed:?})")).dim()
402 );
403
404 is_applied = true;
405
406 if target_version.is_none() {
409 break;
410 }
411 }
412 }
413 if !is_applied {
414 println!("No migrations available to revert");
415 }
416
417 let _ = conn.close().await;
418
419 Ok(())
420}
421
422pub fn build_script(
423 config: &Config,
424 migration_source: &MigrationSourceOpt,
425 force: bool,
426) -> anyhow::Result<()> {
427 let source = migration_source.resolve_path(config);
428
429 anyhow::ensure!(
430 Path::new("Cargo.toml").exists(),
431 "must be run in a Cargo project root"
432 );
433
434 anyhow::ensure!(
435 (force || !Path::new("build.rs").exists()),
436 "build.rs already exists; use --force to overwrite"
437 );
438
439 let contents = format!(
440 r#"// generated by `sqlx migrate build-script`
441fn main() {{
442 // trigger recompilation when a new migration is added
443 println!("cargo:rerun-if-changed={source}");
444}}
445"#,
446 );
447
448 fs::write("build.rs", contents)?;
449
450 println!("Created `build.rs`; be sure to check it into version control!");
451
452 Ok(())
453}