Skip to main content

spawn_db/engine/
mod.rs

1use anyhow::{anyhow, Context, Result};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::fmt;
5use thiserror::Error;
6use tokio::process::Command;
7
8pub mod postgres_psql;
9
10/// Status of a migration in the tracking tables
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum MigrationStatus {
13    Success,
14    Attempted,
15    Failure,
16}
17
18impl MigrationStatus {
19    /// Returns the string representation used in the database
20    pub fn as_str(&self) -> &'static str {
21        match self {
22            MigrationStatus::Success => "SUCCESS",
23            MigrationStatus::Attempted => "ATTEMPTED",
24            MigrationStatus::Failure => "FAILURE",
25        }
26    }
27
28    /// Parse a MigrationStatus from a string representation
29    pub fn from_str(s: &str) -> Option<Self> {
30        match s {
31            "SUCCESS" => Some(MigrationStatus::Success),
32            "ATTEMPTED" => Some(MigrationStatus::Attempted),
33            "FAILURE" => Some(MigrationStatus::Failure),
34            _ => None,
35        }
36    }
37}
38
39impl fmt::Display for MigrationStatus {
40    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41        write!(f, "{}", self.as_str())
42    }
43}
44
45/// Activity type for a migration operation
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum MigrationActivity {
48    Apply,
49    Adopt,
50    Revert,
51}
52
53impl MigrationActivity {
54    /// Returns the string representation used in the database
55    pub fn as_str(&self) -> &'static str {
56        match self {
57            MigrationActivity::Apply => "APPLY",
58            MigrationActivity::Adopt => "ADOPT",
59            MigrationActivity::Revert => "REVERT",
60        }
61    }
62}
63
64impl fmt::Display for MigrationActivity {
65    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66        write!(f, "{}", self.as_str())
67    }
68}
69
70/// Legacy type alias for backwards compatibility
71pub type MigrationHistoryStatus = MigrationStatus;
72
73/// Information about an existing migration entry
74#[derive(Debug, Clone)]
75pub struct ExistingMigrationInfo {
76    pub migration_name: String,
77    pub namespace: String,
78    pub last_status: MigrationHistoryStatus,
79    pub last_activity: String,
80    pub checksum: String,
81}
82
83/// Database information about a migration
84#[derive(Debug, Clone)]
85pub struct MigrationDbInfo {
86    pub migration_name: String,
87    pub last_status: Option<MigrationHistoryStatus>,
88    pub last_activity: Option<String>,
89    pub checksum: Option<String>,
90}
91
92/// Errors specific to migration operations
93#[derive(Debug, Error)]
94pub enum MigrationError {
95    /// Migration was already successfully applied
96    #[error("migration '{name}' in namespace '{namespace}' already applied successfully")]
97    AlreadyApplied {
98        name: String,
99        namespace: String,
100        info: ExistingMigrationInfo,
101    },
102
103    /// Migration exists but last attempt was not successful
104    #[error("migration '{name}' in namespace '{namespace}' has previous {status} status")]
105    PreviousAttemptFailed {
106        name: String,
107        namespace: String,
108        status: MigrationHistoryStatus,
109        info: ExistingMigrationInfo,
110    },
111
112    /// Database or connection error
113    #[error("database error: {0}")]
114    Database(#[from] anyhow::Error),
115
116    // Could not get advisory lock
117    #[error("could not get advisory lock: {0}")]
118    AdvisoryLock(std::io::Error),
119
120    /// CRITICAL: A migration was run but the result could not be recorded in
121    /// spawn's migration tracking tables. Manual intervention is required.
122    #[error("{}", format_not_recorded_error(.name, .migration_outcome, .migration_error, .recording_error))]
123    NotRecorded {
124        name: String,
125        /// Whether the migration itself succeeded or failed
126        migration_outcome: MigrationStatus,
127        /// The error from the migration itself, if it failed
128        migration_error: Option<String>,
129        /// The error from recording the result
130        recording_error: String,
131    },
132}
133
134fn format_not_recorded_error(
135    name: &str,
136    migration_outcome: &MigrationStatus,
137    migration_error: &Option<String>,
138    recording_error: &str,
139) -> String {
140    let (outcome_label, consequence, resolve_steps) = match migration_outcome {
141        MigrationStatus::Success => (
142            "SUCCEEDED",
143            format!(
144                "The migration changes ARE in your database, but spawn does not know about them.\n\
145                 Re-running this migration may cause errors or duplicate changes."
146            ),
147            format!(
148                "1. Verify the migration was applied by checking your database\n\
149                 2. Run `spawn migration adopt {name}` to record the migration\n\
150                 3. Investigate why recording failed (connection issue? permissions?)"
151            ),
152        ),
153        _ => (
154            "FAILED",
155            format!(
156                "The migration did NOT apply, but spawn was unable to record the failure.\n\
157                 Spawn may not be aware this migration was attempted."
158            ),
159            format!(
160                "1. Check your database to confirm the migration was not applied\n\
161                 2. Investigate why recording failed (connection issue? permissions?)\n\
162                 3. Re-run the migration once the issue is resolved"
163            ),
164        ),
165    };
166
167    let mut msg = format!(
168        "\n\
169         [ACTION REQUIRED] Migration '{name}' {outcome_label} but the result could not be recorded.\n\
170         \n\
171         {consequence}\n\
172         \n\
173         Recording error: {recording_error}",
174    );
175
176    if let Some(migration_err) = migration_error {
177        msg.push_str(&format!("\nMigration error: {}", migration_err));
178    }
179
180    msg.push_str(&format!(
181        "\n\
182         \n\
183         To resolve:\n\
184         {resolve_steps}\n",
185    ));
186
187    msg
188}
189
190/// Result type for migration operations
191pub type MigrationResult<T> = Result<T, MigrationError>;
192
193/// Errors for streaming SQL execution
194#[derive(Debug, Error)]
195pub enum EngineError {
196    #[error("execution failed (exit {exit_code}): {stderr}")]
197    ExecutionFailed { exit_code: i32, stderr: String },
198
199    #[error("IO error: {0}")]
200    Io(#[from] std::io::Error),
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize)]
204pub enum EngineType {
205    #[serde(rename = "postgres-psql")]
206    PostgresPSQL,
207}
208
209impl fmt::Display for EngineType {
210    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
211        match self {
212            EngineType::PostgresPSQL => {
213                write!(f, "postgres-psql")
214            }
215        }
216    }
217}
218
219/// Specifies how to obtain the command to execute for database operations.
220#[derive(Clone, Debug, Deserialize, Serialize)]
221#[serde(tag = "kind", rename_all = "snake_case")]
222pub enum CommandSpec {
223    /// Direct command to execute
224    Direct { direct: Vec<String> },
225    /// A provider command that outputs the actual command as a shell command.
226    Provider {
227        provider: Vec<String>,
228        #[serde(default)]
229        append: Vec<String>,
230    },
231}
232
233#[derive(Clone, Debug, Deserialize, Serialize)]
234pub struct DatabaseConfig {
235    pub engine: EngineType,
236    pub spawn_database: String,
237    #[serde(default = "default_schema")]
238    pub spawn_schema: String,
239    #[serde(default = "default_environment")]
240    pub environment: String,
241
242    #[serde(default)]
243    pub command: Option<CommandSpec>,
244}
245
246fn default_environment() -> String {
247    "prod".to_string()
248}
249
250fn default_schema() -> String {
251    "_spawn".to_string()
252}
253
254/// Resolves a CommandSpec to the actual command to execute.
255///
256/// This is a generic function that can be used by any database engine
257/// to resolve their command specification.
258pub async fn resolve_command_spec(spec: CommandSpec) -> Result<Vec<String>> {
259    match spec {
260        CommandSpec::Direct { direct } => Ok(direct),
261        CommandSpec::Provider { provider, append } => {
262            let mut resolved = resolve_provider(&provider).await?;
263            resolved.extend(append);
264            Ok(resolved)
265        }
266    }
267}
268
269/// Executes a provider command and parses its output as a shell command.
270///
271/// The provider must output a shell command string (e.g., `ssh -t -i /path/key user@host`).
272/// The parser handles quoted strings properly using POSIX shell-style parsing.
273async fn resolve_provider(provider: &[String]) -> Result<Vec<String>> {
274    if provider.is_empty() {
275        return Err(anyhow!("Provider command cannot be empty"));
276    }
277
278    let output = Command::new(&provider[0])
279        .args(&provider[1..])
280        .output()
281        .await
282        .context("Failed to execute provider command")?;
283
284    if !output.status.success() {
285        return Err(anyhow!(
286            "Provider command failed (exit {}): {}",
287            output.status.code().unwrap_or(-1),
288            String::from_utf8_lossy(&output.stderr)
289        ));
290    }
291
292    let stdout = String::from_utf8(output.stdout).context("Provider output is not valid UTF-8")?;
293    let trimmed = stdout.trim();
294
295    if trimmed.is_empty() {
296        return Err(anyhow!("Provider returned empty output"));
297    }
298
299    // Parses a shell command string into a Vec<String>, handling quoted arguments.
300    //
301    // Uses the `shlex` crate for proper POSIX shell-style parsing.
302    shlex::split(trimmed).ok_or_else(|| anyhow!("Failed to parse shell command: {}", trimmed))
303}
304
305/// Type alias for the writer closure used in execute_with_writer
306pub type WriterFn = Box<dyn FnOnce(&mut dyn std::io::Write) -> std::io::Result<()> + Send>;
307
308/// Type alias for an optional stdout writer to capture output
309pub type StdoutWriter = Option<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>;
310
311#[async_trait]
312pub trait Engine: Send + Sync {
313    /// Execute SQL by running the provided writer function.
314    /// - `write_fn`: Closure that writes SQL to the provided Write handle
315    /// - `stdout_writer`: Optional writer to capture stdout. If None, stdout is discarded.
316    /// - `merge_stderr`: If true and stdout_writer is Some, stderr is merged into stdout
317    ///                   at the OS level for true interleaving. Useful for tests.
318    ///                   Note: when merged, stderr is not separately available in errors.
319    /// Engine-specific setup (like psql flags) is handled internally.
320    /// Returns stderr content on failure.
321    async fn execute_with_writer(
322        &self,
323        write_fn: WriterFn,
324        stdout_writer: StdoutWriter,
325        merge_stderr: bool,
326    ) -> Result<(), EngineError>;
327
328    async fn migration_apply(
329        &self,
330        migration_name: &str,
331        write_fn: WriterFn,
332        pin_hash: Option<String>,
333        namespace: &str,
334        retry: bool,
335    ) -> MigrationResult<String>;
336
337    /// Adopt a migration without applying it.
338    /// Creates a dummy table entry marking the migration as having been applied manually.
339    /// Sets checksum to empty and status to 'SUCCESS'.
340    async fn migration_adopt(
341        &self,
342        migration_name: &str,
343        namespace: &str,
344        description: &str,
345    ) -> MigrationResult<String>;
346
347    /// Get database information for all migrations in the given namespace.
348    /// If namespace is None, returns migrations from all namespaces.
349    /// Returns a list of migrations that exist in the database with their latest history entry.
350    async fn get_migrations_from_db(
351        &self,
352        namespace: Option<&str>,
353    ) -> MigrationResult<Vec<MigrationDbInfo>>;
354}