spawn_db/commands/migration/
apply.rs1use crate::commands::migration::get_pending_and_confirm;
2use crate::commands::{Command, Outcome, TelemetryDescribe, TelemetryInfo};
3use crate::config::Config;
4use crate::engine::MigrationError;
5use crate::migrator::Migrator;
6use crate::variables::Variables;
7use anyhow::{anyhow, Result};
8
9pub struct ApplyMigration {
10 pub migration: Option<String>,
11 pub pinned: bool,
12 pub variables: Option<Variables>,
13 pub yes: bool,
14 pub retry: bool,
15}
16
17impl TelemetryDescribe for ApplyMigration {
18 fn telemetry(&self) -> TelemetryInfo {
19 TelemetryInfo::new("migration apply").with_properties(vec![
20 ("opt_pinned", self.pinned.to_string()),
21 ("has_variables", self.variables.is_some().to_string()),
22 ("apply_all", self.migration.is_none().to_string()),
23 ])
24 }
25}
26
27impl Command for ApplyMigration {
28 async fn execute(&self, config: &Config) -> Result<Outcome> {
29 let migrations = match &self.migration {
30 Some(migration) => vec![migration.clone()],
31 None => match get_pending_and_confirm(config, "apply", self.yes).await? {
32 Some(pending) => pending,
33 None => return Ok(Outcome::AppliedMigrations),
34 },
35 };
36
37 let total = migrations.len();
38 for (i, migration) in migrations.into_iter().enumerate() {
39 let counter = if total > 1 {
40 format!(
41 "[{:>width$}/{}] ",
42 i + 1,
43 total,
44 width = total.to_string().len()
45 )
46 } else {
47 String::new()
48 };
49 let mgrtr = Migrator::new(config, &migration, self.pinned);
50 match mgrtr.generate_streaming(self.variables.clone()).await {
51 Ok(streaming) => {
52 let engine = config.new_engine().await?;
53 let write_fn = streaming.into_writer_fn();
54 match engine
55 .migration_apply(
56 &migration,
57 write_fn,
58 None,
59 super::DEFAULT_NAMESPACE,
60 self.retry,
61 )
62 .await
63 {
64 Ok(_) => {
65 println!("{}Migration '{}' applied successfully", counter, &migration);
66 }
67 Err(MigrationError::AlreadyApplied { info, .. }) => {
68 println!(
69 "{}Migration '{}' already applied (status: {}, checksum: {})",
70 counter, &migration, info.last_status, info.checksum
71 );
72 }
73 Err(MigrationError::PreviousAttemptFailed { status, info, .. }) => {
74 return Err(anyhow!(
75 "Migration '{}' has a previous {} attempt (checksum: {}).\n\
76 Use `spawn migration apply --retry {}` to retry.",
77 &migration,
78 status,
79 info.checksum,
80 &migration,
81 ));
82 }
83 Err(MigrationError::Database(e)) => {
84 return Err(
85 e.context(format!("Failed applying migration {}", &migration))
86 );
87 }
88 Err(MigrationError::AdvisoryLock(e)) => {
89 return Err(
90 anyhow!("Unable to obtain advisory lock for migration").context(e)
91 );
92 }
93 Err(e @ MigrationError::NotRecorded { .. }) => {
94 return Err(anyhow!("{}", e));
95 }
96 }
97 }
98 Err(e) => {
99 let context = if self.pinned {
100 anyhow!(
101 "Failed to generate migration '{}'. Is it pinned? \
102 Run `spawn migration pin {}` or use `--no-pin` to apply without pinning.",
103 &migration, &migration
104 )
105 } else {
106 anyhow!("failed to generate migration '{}'", &migration)
107 };
108 return Err(e.context(context));
109 }
110 };
111 }
112 Ok(Outcome::AppliedMigrations)
113 }
114}