Skip to main content

spawn_db/commands/migration/
apply.rs

1use crate::commands::migration::get_pending_and_confirm;
2use crate::commands::{Command, Outcome, TelemetryDescribe, TelemetryInfo};
3use crate::config::Config;
4use crate::engine::MigrationError;
5use crate::migrator::Migrator;
6use crate::variables::Variables;
7use anyhow::{anyhow, Result};
8
9pub struct ApplyMigration {
10    pub migration: Option<String>,
11    pub pinned: bool,
12    pub variables: Option<Variables>,
13    pub yes: bool,
14    pub retry: bool,
15}
16
17impl TelemetryDescribe for ApplyMigration {
18    fn telemetry(&self) -> TelemetryInfo {
19        TelemetryInfo::new("migration apply").with_properties(vec![
20            ("opt_pinned", self.pinned.to_string()),
21            ("has_variables", self.variables.is_some().to_string()),
22            ("apply_all", self.migration.is_none().to_string()),
23        ])
24    }
25}
26
27impl Command for ApplyMigration {
28    async fn execute(&self, config: &Config) -> Result<Outcome> {
29        let migrations = match &self.migration {
30            Some(migration) => vec![migration.clone()],
31            None => match get_pending_and_confirm(config, "apply", self.yes).await? {
32                Some(pending) => pending,
33                None => return Ok(Outcome::AppliedMigrations),
34            },
35        };
36
37        let total = migrations.len();
38        for (i, migration) in migrations.into_iter().enumerate() {
39            let counter = if total > 1 {
40                format!(
41                    "[{:>width$}/{}] ",
42                    i + 1,
43                    total,
44                    width = total.to_string().len()
45                )
46            } else {
47                String::new()
48            };
49            let mgrtr = Migrator::new(config, &migration, self.pinned);
50            match mgrtr.generate_streaming(self.variables.clone()).await {
51                Ok(streaming) => {
52                    let engine = config.new_engine().await?;
53                    let write_fn = streaming.into_writer_fn();
54                    match engine
55                        .migration_apply(
56                            &migration,
57                            write_fn,
58                            None,
59                            super::DEFAULT_NAMESPACE,
60                            self.retry,
61                        )
62                        .await
63                    {
64                        Ok(_) => {
65                            println!("{}Migration '{}' applied successfully", counter, &migration);
66                        }
67                        Err(MigrationError::AlreadyApplied { info, .. }) => {
68                            println!(
69                                "{}Migration '{}' already applied (status: {}, checksum: {})",
70                                counter, &migration, info.last_status, info.checksum
71                            );
72                        }
73                        Err(MigrationError::PreviousAttemptFailed { status, info, .. }) => {
74                            return Err(anyhow!(
75                                "Migration '{}' has a previous {} attempt (checksum: {}).\n\
76                                 Use `spawn migration apply --retry {}` to retry.",
77                                &migration,
78                                status,
79                                info.checksum,
80                                &migration,
81                            ));
82                        }
83                        Err(MigrationError::Database(e)) => {
84                            return Err(
85                                e.context(format!("Failed applying migration {}", &migration))
86                            );
87                        }
88                        Err(MigrationError::AdvisoryLock(e)) => {
89                            return Err(
90                                anyhow!("Unable to obtain advisory lock for migration").context(e)
91                            );
92                        }
93                        Err(e @ MigrationError::NotRecorded { .. }) => {
94                            return Err(anyhow!("{}", e));
95                        }
96                    }
97                }
98                Err(e) => {
99                    let context = if self.pinned {
100                        anyhow!(
101                            "Failed to generate migration '{}'. Is it pinned? \
102                             Run `spawn migration pin {}` or use `--no-pin` to apply without pinning.",
103                            &migration, &migration
104                        )
105                    } else {
106                        anyhow!("failed to generate migration '{}'", &migration)
107                    };
108                    return Err(e.context(context));
109                }
110            };
111        }
112        Ok(Outcome::AppliedMigrations)
113    }
114}