1use anyhow::{anyhow, Context, Result};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::fmt;
5use thiserror::Error;
6use tokio::process::Command;
7
8pub mod postgres_psql;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum MigrationStatus {
13 Success,
14 Attempted,
15 Failure,
16}
17
18impl MigrationStatus {
19 pub fn as_str(&self) -> &'static str {
21 match self {
22 MigrationStatus::Success => "SUCCESS",
23 MigrationStatus::Attempted => "ATTEMPTED",
24 MigrationStatus::Failure => "FAILURE",
25 }
26 }
27
28 pub fn from_str(s: &str) -> Option<Self> {
30 match s {
31 "SUCCESS" => Some(MigrationStatus::Success),
32 "ATTEMPTED" => Some(MigrationStatus::Attempted),
33 "FAILURE" => Some(MigrationStatus::Failure),
34 _ => None,
35 }
36 }
37}
38
39impl fmt::Display for MigrationStatus {
40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41 write!(f, "{}", self.as_str())
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum MigrationActivity {
48 Apply,
49 Adopt,
50 Revert,
51}
52
53impl MigrationActivity {
54 pub fn as_str(&self) -> &'static str {
56 match self {
57 MigrationActivity::Apply => "APPLY",
58 MigrationActivity::Adopt => "ADOPT",
59 MigrationActivity::Revert => "REVERT",
60 }
61 }
62}
63
64impl fmt::Display for MigrationActivity {
65 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66 write!(f, "{}", self.as_str())
67 }
68}
69
70pub type MigrationHistoryStatus = MigrationStatus;
72
73#[derive(Debug, Clone)]
75pub struct ExistingMigrationInfo {
76 pub migration_name: String,
77 pub namespace: String,
78 pub last_status: MigrationHistoryStatus,
79 pub last_activity: String,
80 pub checksum: String,
81}
82
83#[derive(Debug, Clone)]
85pub struct MigrationDbInfo {
86 pub migration_name: String,
87 pub last_status: Option<MigrationHistoryStatus>,
88 pub last_activity: Option<String>,
89 pub checksum: Option<String>,
90}
91
92#[derive(Debug, Error)]
94pub enum MigrationError {
95 #[error("migration '{name}' in namespace '{namespace}' already applied successfully")]
97 AlreadyApplied {
98 name: String,
99 namespace: String,
100 info: ExistingMigrationInfo,
101 },
102
103 #[error("migration '{name}' in namespace '{namespace}' has previous {status} status")]
105 PreviousAttemptFailed {
106 name: String,
107 namespace: String,
108 status: MigrationHistoryStatus,
109 info: ExistingMigrationInfo,
110 },
111
112 #[error("database error: {0}")]
114 Database(#[from] anyhow::Error),
115
116 #[error("could not get advisory lock: {0}")]
118 AdvisoryLock(std::io::Error),
119
120 #[error("{}", format_not_recorded_error(.name, .migration_outcome, .migration_error, .recording_error))]
123 NotRecorded {
124 name: String,
125 migration_outcome: MigrationStatus,
127 migration_error: Option<String>,
129 recording_error: String,
131 },
132}
133
134fn format_not_recorded_error(
135 name: &str,
136 migration_outcome: &MigrationStatus,
137 migration_error: &Option<String>,
138 recording_error: &str,
139) -> String {
140 let (outcome_label, consequence, resolve_steps) = match migration_outcome {
141 MigrationStatus::Success => (
142 "SUCCEEDED",
143 format!(
144 "The migration changes ARE in your database, but spawn does not know about them.\n\
145 Re-running this migration may cause errors or duplicate changes."
146 ),
147 format!(
148 "1. Verify the migration was applied by checking your database\n\
149 2. Run `spawn migration adopt {name}` to record the migration\n\
150 3. Investigate why recording failed (connection issue? permissions?)"
151 ),
152 ),
153 _ => (
154 "FAILED",
155 format!(
156 "The migration did NOT apply, but spawn was unable to record the failure.\n\
157 Spawn may not be aware this migration was attempted."
158 ),
159 format!(
160 "1. Check your database to confirm the migration was not applied\n\
161 2. Investigate why recording failed (connection issue? permissions?)\n\
162 3. Re-run the migration once the issue is resolved"
163 ),
164 ),
165 };
166
167 let mut msg = format!(
168 "\n\
169 [ACTION REQUIRED] Migration '{name}' {outcome_label} but the result could not be recorded.\n\
170 \n\
171 {consequence}\n\
172 \n\
173 Recording error: {recording_error}",
174 );
175
176 if let Some(migration_err) = migration_error {
177 msg.push_str(&format!("\nMigration error: {}", migration_err));
178 }
179
180 msg.push_str(&format!(
181 "\n\
182 \n\
183 To resolve:\n\
184 {resolve_steps}\n",
185 ));
186
187 msg
188}
189
190pub type MigrationResult<T> = Result<T, MigrationError>;
192
193#[derive(Debug, Error)]
195pub enum EngineError {
196 #[error("execution failed (exit {exit_code}): {stderr}")]
197 ExecutionFailed { exit_code: i32, stderr: String },
198
199 #[error("IO error: {0}")]
200 Io(#[from] std::io::Error),
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize)]
204pub enum EngineType {
205 #[serde(rename = "postgres-psql")]
206 PostgresPSQL,
207}
208
209impl fmt::Display for EngineType {
210 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
211 match self {
212 EngineType::PostgresPSQL => {
213 write!(f, "postgres-psql")
214 }
215 }
216 }
217}
218
219#[derive(Clone, Debug, Deserialize, Serialize)]
221#[serde(tag = "kind", rename_all = "snake_case")]
222pub enum CommandSpec {
223 Direct { direct: Vec<String> },
225 Provider {
227 provider: Vec<String>,
228 #[serde(default)]
229 append: Vec<String>,
230 },
231}
232
233#[derive(Clone, Debug, Deserialize, Serialize)]
234pub struct DatabaseConfig {
235 pub engine: EngineType,
236 pub spawn_database: String,
237 #[serde(default = "default_schema")]
238 pub spawn_schema: String,
239 #[serde(default = "default_environment")]
240 pub environment: String,
241
242 #[serde(default)]
243 pub command: Option<CommandSpec>,
244}
245
246fn default_environment() -> String {
247 "prod".to_string()
248}
249
250fn default_schema() -> String {
251 "_spawn".to_string()
252}
253
254pub async fn resolve_command_spec(spec: CommandSpec) -> Result<Vec<String>> {
259 match spec {
260 CommandSpec::Direct { direct } => Ok(direct),
261 CommandSpec::Provider { provider, append } => {
262 let mut resolved = resolve_provider(&provider).await?;
263 resolved.extend(append);
264 Ok(resolved)
265 }
266 }
267}
268
269async fn resolve_provider(provider: &[String]) -> Result<Vec<String>> {
274 if provider.is_empty() {
275 return Err(anyhow!("Provider command cannot be empty"));
276 }
277
278 let output = Command::new(&provider[0])
279 .args(&provider[1..])
280 .output()
281 .await
282 .context("Failed to execute provider command")?;
283
284 if !output.status.success() {
285 return Err(anyhow!(
286 "Provider command failed (exit {}): {}",
287 output.status.code().unwrap_or(-1),
288 String::from_utf8_lossy(&output.stderr)
289 ));
290 }
291
292 let stdout = String::from_utf8(output.stdout).context("Provider output is not valid UTF-8")?;
293 let trimmed = stdout.trim();
294
295 if trimmed.is_empty() {
296 return Err(anyhow!("Provider returned empty output"));
297 }
298
299 shlex::split(trimmed).ok_or_else(|| anyhow!("Failed to parse shell command: {}", trimmed))
303}
304
305pub type WriterFn = Box<dyn FnOnce(&mut dyn std::io::Write) -> std::io::Result<()> + Send>;
307
308pub type StdoutWriter = Option<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>;
310
311#[async_trait]
312pub trait Engine: Send + Sync {
313 async fn execute_with_writer(
322 &self,
323 write_fn: WriterFn,
324 stdout_writer: StdoutWriter,
325 merge_stderr: bool,
326 ) -> Result<(), EngineError>;
327
328 async fn migration_apply(
329 &self,
330 migration_name: &str,
331 write_fn: WriterFn,
332 pin_hash: Option<String>,
333 namespace: &str,
334 retry: bool,
335 ) -> MigrationResult<String>;
336
337 async fn migration_adopt(
341 &self,
342 migration_name: &str,
343 namespace: &str,
344 description: &str,
345 ) -> MigrationResult<String>;
346
347 async fn get_migrations_from_db(
351 &self,
352 namespace: Option<&str>,
353 ) -> MigrationResult<Vec<MigrationDbInfo>>;
354}