spawn_db/commands/migration/
mod.rs1mod adopt;
2mod apply;
3mod build;
4mod new;
5mod pin;
6mod status;
7
8pub use adopt::AdoptMigration;
9pub use apply::ApplyMigration;
10pub use build::BuildMigration;
11pub use new::NewMigration;
12pub use pin::PinMigration;
13pub use status::MigrationStatus;
14
15pub const DEFAULT_NAMESPACE: &str = "default";
16
17use crate::config::Config;
18use crate::engine::{MigrationDbInfo, MigrationHistoryStatus};
19use anyhow::{Context, Result};
20use dialoguer::Confirm;
21use futures::TryStreamExt;
22use regex::Regex;
23use std::collections::{HashMap, HashSet};
24
25#[derive(Debug, Clone)]
27pub struct MigrationStatusRow {
28 pub migration_name: String,
29 pub exists_in_filesystem: bool,
30 pub is_pinned: bool,
31 pub exists_in_db: bool,
32 pub last_status: Option<MigrationHistoryStatus>,
33 pub last_activity: Option<String>,
34 pub checksum: Option<String>,
35}
36
37pub async fn get_combined_migration_status(
41 config: &Config,
42 namespace: Option<&str>,
43) -> Result<Vec<MigrationStatusRow>> {
44 let engine = config.new_engine().await?;
45 let op = config.operator();
46
47 let pather = config.pather();
49 let migrations_folder = pather.migrations_folder();
50 let migrations_prefix = format!("{}/", migrations_folder.trim_start_matches('/'));
51
52 let mut lister = op
54 .lister_with(&migrations_prefix)
55 .recursive(true)
56 .await
57 .context("listing migrations recursively")?;
58
59 let mut fs_migration_names: HashSet<String> = HashSet::new();
61 let mut pinned_migrations: HashSet<String> = HashSet::new();
62
63 let up_sql_re = Regex::new(r"(?P<name>[^/]+)/up\.sql$").expect("valid regex");
64 let lock_toml_re = Regex::new(r"(?P<name>[^/]+)/lock\.toml$").expect("valid regex");
65
66 while let Some(entry) = lister.try_next().await? {
67 let path = entry.path().to_string();
68 if let Some(caps) = up_sql_re.captures(&path) {
69 fs_migration_names.insert(caps["name"].to_string());
70 }
71 if let Some(caps) = lock_toml_re.captures(&path) {
72 pinned_migrations.insert(caps["name"].to_string());
73 }
74 }
75
76 let db_migrations_list = engine.get_migrations_from_db(namespace).await?;
78
79 let db_migrations: HashMap<String, MigrationDbInfo> = db_migrations_list
81 .into_iter()
82 .map(|info| (info.migration_name.clone(), info))
83 .collect();
84
85 let all_migration_names: HashSet<String> = fs_migration_names
87 .iter()
88 .chain(db_migrations.keys())
89 .chain(pinned_migrations.iter())
90 .cloned()
91 .collect();
92
93 let mut results: Vec<MigrationStatusRow> = all_migration_names
94 .into_iter()
95 .map(|name| {
96 let exists_in_fs = fs_migration_names.contains(&name);
97 let db_info = db_migrations.get(&name);
98
99 MigrationStatusRow {
100 migration_name: name.clone(),
101 exists_in_filesystem: exists_in_fs,
102 is_pinned: pinned_migrations.contains(&name),
103 exists_in_db: db_info.is_some(),
104 last_status: db_info.and_then(|info| info.last_status),
105 last_activity: db_info.and_then(|info| info.last_activity.clone()),
106 checksum: db_info.and_then(|info| info.checksum.clone()),
107 }
108 })
109 .collect();
110
111 results.sort_by(|a, b| a.migration_name.cmp(&b.migration_name));
113
114 Ok(results)
115}
116
117pub async fn get_pending_and_confirm(
121 config: &Config,
122 action: &str,
123 yes: bool,
124) -> Result<Option<Vec<String>>> {
125 let status_rows = get_combined_migration_status(config, Some(DEFAULT_NAMESPACE)).await?;
126
127 let pending: Vec<String> = status_rows
128 .into_iter()
129 .filter(|row| row.last_status.is_none() && row.exists_in_filesystem)
130 .map(|row| row.migration_name)
131 .collect();
132
133 if pending.is_empty() {
134 println!("No pending migrations to {}.", action);
135 return Ok(None);
136 }
137
138 let db_config = config.db_config()?;
139 let target = config.database.as_deref().unwrap_or("unknown");
140 let env = &db_config.environment;
141
142 println!();
143 println!("TARGET: {}", target);
144 if env.starts_with("prod") {
145 println!("ENVIRONMENT: {} \u{26a0}\u{fe0f}", env);
146 } else {
147 println!("ENVIRONMENT: {}", env);
148 }
149 println!();
150 println!(
151 "The following {} migration{} will be {}:",
152 pending.len(),
153 if pending.len() == 1 { "" } else { "s" },
154 if action == "apply" {
155 "applied"
156 } else {
157 "adopted"
158 },
159 );
160 for (i, name) in pending.iter().enumerate() {
161 println!(" {}. {}", i + 1, name);
162 }
163 println!();
164
165 if !yes {
166 let prompt = format!("Do you want to {} these migrations?", action);
167 let confirmed = Confirm::new()
168 .with_prompt(prompt)
169 .default(false)
170 .interact()?;
171
172 if !confirmed {
173 println!("Aborted.");
174 return Ok(None);
175 }
176 }
177
178 println!();
179 Ok(Some(pending))
180}