Skip to main content

spawn_db/engine/
postgres_psql.rs

1// This is a driver that uses a locally provided PSQL command to execute
2// scripts, which enables user's scripts to take advantage of things like the
3// build in PSQL helper commands.
4
5use crate::config::FolderPather;
6use crate::engine::{
7    resolve_command_spec, DatabaseConfig, Engine, EngineError, ExistingMigrationInfo,
8    MigrationActivity, MigrationError, MigrationHistoryStatus, MigrationResult, MigrationStatus,
9    StdoutWriter, WriterFn,
10};
11use crate::escape::{EscapedIdentifier, EscapedLiteral, EscapedQuery, InsecureRawSql};
12use crate::sql_query;
13use crate::store::pinner::latest::Latest;
14use crate::store::{operator_from_includedir, Store};
15use anyhow::{anyhow, Context, Result};
16use async_trait::async_trait;
17use include_dir::{include_dir, Dir};
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::io::Write;
21use std::process::Stdio;
22use std::sync::{Arc, Mutex};
23use std::time::Instant;
24use tokio::io::AsyncReadExt;
25use tokio::process::Command;
26use twox_hash::xxhash3_128;
27use twox_hash::XxHash64;
28
29/// Returns the advisory lock key used to prevent concurrent migrations.
30/// This is computed as XxHash64 of "SPAWN_MIGRATION_LOCK" with seed 1234, cast to i64.
31pub fn migration_lock_key() -> i64 {
32    XxHash64::oneshot(1234, "SPAWN_MIGRATION_LOCK".as_bytes()) as i64
33}
34
35#[derive(Debug)]
36pub struct PSQL {
37    psql_command: Vec<String>,
38    /// Schema name as an escaped identifier (for use as schema.table)
39    spawn_schema: String,
40    db_config: DatabaseConfig,
41}
42
43static PROJECT_DIR: Dir<'_> = include_dir!("./static/engine-migrations/postgres-psql");
44static SPAWN_NAMESPACE: &str = "spawn";
45
46impl PSQL {
47    pub async fn new(config: &DatabaseConfig) -> Result<Box<dyn Engine>> {
48        let command_spec = config
49            .command
50            .clone()
51            .ok_or(anyhow!("Command for database config must be defined"))?;
52
53        let psql_command = resolve_command_spec(command_spec).await?;
54
55        let eng = Box::new(Self {
56            psql_command,
57            spawn_schema: config.spawn_schema.clone(),
58            db_config: config.clone(),
59        });
60
61        // Ensure we have latest schema:
62        eng.update_schema()
63            .await
64            .map_err(MigrationError::Database)?;
65
66        Ok(eng)
67    }
68
69    fn spawn_schema_literal(&self) -> EscapedLiteral {
70        EscapedLiteral::new(&self.spawn_schema)
71    }
72
73    fn spawn_schema_ident(&self) -> EscapedIdentifier {
74        EscapedIdentifier::new(&self.spawn_schema)
75    }
76
77    fn safe_spawn_namespace(&self) -> EscapedLiteral {
78        EscapedLiteral::new(SPAWN_NAMESPACE)
79    }
80}
81
82#[async_trait]
83impl Engine for PSQL {
84    async fn execute_with_writer(
85        &self,
86        write_fn: WriterFn,
87        stdout_writer: StdoutWriter,
88        merge_stderr: bool,
89    ) -> Result<(), EngineError> {
90        // 1. Create the pipe for stdin
91        let (reader, mut writer) = std::io::pipe()?;
92
93        // 2. Configure stdout/stderr and spawn psql
94        //
95        // When merge_stderr is true, we create our own pipe and give the
96        // write end to both stdout and stderr so the OS interleaves them.
97        // Otherwise, stdout is piped (or null) and stderr is piped separately.
98        let merge = merge_stderr && stdout_writer.is_some();
99
100        let (mut child, combined_read) = if merge {
101            let (out_read, out_write) = std::io::pipe()?;
102            let out_write_dup = out_write.try_clone()?;
103            let child = Command::new(&self.psql_command[0])
104                .args(&self.psql_command[1..])
105                .stdin(Stdio::from(reader))
106                .stdout(Stdio::from(out_write))
107                .stderr(Stdio::from(out_write_dup))
108                .spawn()
109                .map_err(EngineError::Io)?;
110            (child, Some(out_read))
111        } else {
112            let stdout_config = if stdout_writer.is_some() {
113                Stdio::piped()
114            } else {
115                Stdio::null()
116            };
117            let child = Command::new(&self.psql_command[0])
118                .args(&self.psql_command[1..])
119                .stdin(Stdio::from(reader))
120                .stdout(stdout_config)
121                .stderr(Stdio::piped())
122                .spawn()
123                .map_err(EngineError::Io)?;
124            (child, None)
125        };
126
127        // 3. Copy output to stdout_writer if provided
128        let stdout_handle = if let Some(mut stdout_dest) = stdout_writer {
129            if let Some(mut combined_read) = combined_read {
130                // Merged mode: read from our combined pipe in a blocking
131                // thread (it's a std::io::PipeReader, not a tokio type),
132                // then write to the async destination.
133                Some(tokio::task::spawn(async move {
134                    let buf = tokio::task::spawn_blocking(move || {
135                        use std::io::Read;
136                        let mut buf = Vec::new();
137                        let _ = combined_read.read_to_end(&mut buf);
138                        buf
139                    })
140                    .await
141                    .unwrap_or_default();
142                    use tokio::io::AsyncWriteExt;
143                    let _ = stdout_dest.write_all(&buf).await;
144                }))
145            } else {
146                let mut stdout = child.stdout.take().expect("stdout should be piped");
147                Some(tokio::task::spawn(async move {
148                    use tokio::io::AsyncWriteExt;
149                    let mut buf = Vec::new();
150                    let _ = stdout.read_to_end(&mut buf).await;
151                    let _ = stdout_dest.write_all(&buf).await;
152                }))
153            }
154        } else {
155            None
156        };
157
158        // 4. Drain stderr in background (prevents deadlock).
159        //    When merged, child.stderr is None (it shares the stdout pipe).
160        let stderr_handle = if let Some(mut stderr) = child.stderr.take() {
161            Some(tokio::spawn(async move {
162                let mut buf = Vec::new();
163                let _ = stderr.read_to_end(&mut buf).await;
164                buf
165            }))
166        } else {
167            None
168        };
169
170        // 5. Run the writer function in a blocking thread
171        let writer_handle = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
172            // PSQL-specific setup - QUIET must be first to suppress output from other settings
173            writer.write_all(b"\\set QUIET on\n")?;
174            writer.write_all(b"\\pset pager off\n")?;
175            writer.write_all(b"\\set ON_ERROR_STOP on\n")?;
176
177            // User's write function (template rendering, etc.)
178            write_fn(&mut writer)?;
179
180            // Writer dropped here -> EOF to psql
181            Ok(())
182        });
183
184        // 6. Wait for writing to complete
185        writer_handle
186            .await
187            .map_err(|e| EngineError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??;
188
189        // 7. Wait for stdout copy if applicable (must complete before we read the buffer)
190        if let Some(handle) = stdout_handle {
191            let _ = handle.await;
192        }
193
194        // 8. Wait for psql and check result
195        let status = child.wait().await?;
196        let stderr_bytes = match stderr_handle {
197            Some(handle) => handle.await.unwrap_or_default(),
198            None => Vec::new(),
199        };
200
201        if !status.success() {
202            return Err(EngineError::ExecutionFailed {
203                exit_code: status.code().unwrap_or(-1),
204                stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
205            });
206        }
207
208        Ok(())
209    }
210
211    async fn migration_apply(
212        &self,
213        migration_name: &str,
214        write_fn: WriterFn,
215        pin_hash: Option<String>,
216        namespace: &str,
217        retry: bool,
218    ) -> MigrationResult<String> {
219        self.apply_and_record_migration_v1(
220            migration_name,
221            write_fn,
222            pin_hash,
223            EscapedLiteral::new(namespace),
224            retry,
225        )
226        .await
227    }
228
229    async fn migration_adopt(
230        &self,
231        migration_name: &str,
232        namespace: &str,
233        description: &str,
234    ) -> MigrationResult<String> {
235        let namespace_lit = EscapedLiteral::new(namespace);
236
237        // Check if migration already exists in history
238        let existing_status = self
239            .get_migration_status(migration_name, &namespace_lit)
240            .await
241            .map_err(MigrationError::Database)?;
242
243        if let Some(info) = existing_status {
244            let name = migration_name.to_string();
245            let ns = namespace_lit.raw_value().to_string();
246
247            match info.last_status {
248                MigrationHistoryStatus::Success => {
249                    return Err(MigrationError::AlreadyApplied {
250                        name,
251                        namespace: ns,
252                        info,
253                    });
254                }
255                // Allow adopting migrations that previously failed or were attempted.
256                // This is one of the ways to resolve: fix manually and mark as adopted.
257                MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {}
258            }
259        }
260
261        // Record the migration with SUCCESS status, ADOPT activity, empty checksum
262        self.record_migration(
263            migration_name,
264            &namespace_lit,
265            MigrationStatus::Success,
266            MigrationActivity::Adopt,
267            None, // empty checksum
268            None, // no execution time
269            None, // no pin_hash
270            Some(description),
271        )
272        .await?;
273
274        Ok(format!(
275            "Migration '{}' adopted successfully",
276            migration_name
277        ))
278    }
279
280    async fn get_migrations_from_db(
281        &self,
282        namespace: Option<&str>,
283    ) -> MigrationResult<Vec<crate::engine::MigrationDbInfo>> {
284        use serde::Deserialize;
285
286        // Build the query with optional namespace filter
287        let namespace_lit = namespace.map(|ns| EscapedLiteral::new(ns));
288        let query = sql_query!(
289            r#"
290            SELECT json_agg(row_to_json(t))
291            FROM (
292                SELECT DISTINCT ON (m.name)
293                    m.name as migration_name,
294                    mh.status_id_status as last_status,
295                    mh.activity_id_activity as last_activity,
296                    encode(mh.checksum, 'hex') as checksum
297                FROM {}.migration m
298                LEFT JOIN {}.migration_history mh ON m.migration_id = mh.migration_id_migration
299                WHERE {} IS NULL OR m.namespace = {}
300                ORDER BY m.name, mh.created_at DESC NULLS LAST
301            ) t
302            "#,
303            self.spawn_schema_ident(),
304            self.spawn_schema_ident(),
305            namespace_lit,
306            namespace_lit
307        );
308
309        let output = self
310            .execute_sql(&query, Some("unaligned"))
311            .await
312            .map_err(MigrationError::Database)?;
313
314        // Define a struct for JSON deserialization
315        #[derive(Deserialize)]
316        struct MigrationRow {
317            migration_name: String,
318            last_status: Option<String>,
319            last_activity: Option<String>,
320            checksum: Option<String>,
321        }
322
323        // Parse the JSON output
324        let json_str = output.trim();
325
326        // Handle case where there are no migrations (json_agg returns null)
327        if json_str == "null" || json_str.is_empty() {
328            return Ok(Vec::new());
329        }
330
331        let rows: Vec<MigrationRow> = serde_json::from_str(json_str).map_err(|e| {
332            MigrationError::Database(anyhow::anyhow!(
333                "Failed to parse JSON from database (output: '{}'): {}",
334                json_str,
335                e
336            ))
337        })?;
338
339        // Convert to MigrationDbInfo
340        let mut results: Vec<crate::engine::MigrationDbInfo> = rows
341            .into_iter()
342            .map(|row| {
343                let status = row
344                    .last_status
345                    .as_deref()
346                    .and_then(MigrationHistoryStatus::from_str);
347
348                crate::engine::MigrationDbInfo {
349                    migration_name: row.migration_name,
350                    last_status: status,
351                    last_activity: row.last_activity,
352                    checksum: row.checksum,
353                }
354            })
355            .collect();
356
357        // Sort by migration name for consistent output
358        results.sort_by(|a, b| a.migration_name.cmp(&b.migration_name));
359
360        Ok(results)
361    }
362}
363
364/// A simple AsyncWrite implementation that appends to a shared Vec<u8>
365struct SharedBufWriter(Arc<Mutex<Vec<u8>>>);
366
367impl tokio::io::AsyncWrite for SharedBufWriter {
368    fn poll_write(
369        self: std::pin::Pin<&mut Self>,
370        _cx: &mut std::task::Context<'_>,
371        buf: &[u8],
372    ) -> std::task::Poll<std::io::Result<usize>> {
373        self.0.lock().unwrap().extend_from_slice(buf);
374        std::task::Poll::Ready(Ok(buf.len()))
375    }
376
377    fn poll_flush(
378        self: std::pin::Pin<&mut Self>,
379        _cx: &mut std::task::Context<'_>,
380    ) -> std::task::Poll<std::io::Result<()>> {
381        std::task::Poll::Ready(Ok(()))
382    }
383
384    fn poll_shutdown(
385        self: std::pin::Pin<&mut Self>,
386        _cx: &mut std::task::Context<'_>,
387    ) -> std::task::Poll<std::io::Result<()>> {
388        std::task::Poll::Ready(Ok(()))
389    }
390}
391
392/// A writer that tees output to both an inner writer and a hasher for checksum calculation.
393/// This allows streaming the migration SQL while computing the checksum on-the-fly.
394struct TeeWriter<W: Write> {
395    inner: W,
396    hasher: xxhash3_128::Hasher,
397}
398
399impl<W: Write> TeeWriter<W> {
400    fn new(inner: W) -> Self {
401        Self {
402            inner,
403            hasher: xxhash3_128::Hasher::new(),
404        }
405    }
406
407    fn finish(self) -> (W, u128) {
408        (self.inner, self.hasher.finish_128())
409    }
410}
411
412impl<W: Write> Write for TeeWriter<W> {
413    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
414        self.hasher.write(buf);
415        self.inner.write(buf)
416    }
417
418    fn flush(&mut self) -> std::io::Result<()> {
419        self.inner.flush()
420    }
421}
422
423/// Free function to build the SQL query for recording a migration.
424/// This can be used from both async and sync contexts.
425fn build_record_migration_sql(
426    spawn_schema: &str,
427    migration_name: &str,
428    namespace: &EscapedLiteral,
429    status: MigrationStatus,
430    activity: MigrationActivity,
431    checksum: Option<&str>,
432    execution_time: Option<f32>,
433    pin_hash: Option<&str>,
434    description: Option<&str>,
435) -> EscapedQuery {
436    let schema_ident = EscapedIdentifier::new(spawn_schema);
437    let safe_migration_name = EscapedLiteral::new(migration_name);
438    let safe_status = EscapedLiteral::new(status.as_str());
439    let safe_activity = EscapedLiteral::new(activity.as_str());
440    let safe_description = EscapedLiteral::new(description.unwrap_or(""));
441    // If no checksum provided, use empty bytea (decode returns empty bytea for empty string)
442    let checksum_expr = checksum
443        .map(|c| format!("decode('{}', 'hex')", c))
444        .unwrap_or_else(|| "decode('', 'hex')".to_string());
445    let checksum_raw = InsecureRawSql::new(&checksum_expr);
446    let safe_pin_hash = pin_hash.map(|h| EscapedLiteral::new(h));
447
448    let duration_interval = execution_time
449        .map(|d| InsecureRawSql::new(&format!("INTERVAL '{} second'", d)))
450        .unwrap_or_else(|| InsecureRawSql::new("INTERVAL '0 second'"));
451
452    sql_query!(
453        r#"
454BEGIN;
455WITH inserted_migration AS (
456    INSERT INTO {}.migration (name, namespace) VALUES ({}, {})
457    ON CONFLICT (name, namespace) DO UPDATE SET name = EXCLUDED.name
458    RETURNING migration_id
459)
460INSERT INTO {}.migration_history (
461    migration_id_migration,
462    activity_id_activity,
463    created_by,
464    description,
465    status_note,
466    status_id_status,
467    checksum,
468    execution_time,
469    pin_hash
470)
471SELECT
472    migration_id,
473    {},
474    'unused',
475    {},
476    '',
477    {},
478    {},
479    {},
480    {}
481FROM inserted_migration;
482COMMIT;
483"#,
484        schema_ident,
485        safe_migration_name,
486        namespace,
487        schema_ident,
488        safe_activity,
489        safe_description,
490        safe_status,
491        checksum_raw,
492        duration_interval,
493        safe_pin_hash,
494    )
495}
496
497impl PSQL {
498    pub async fn update_schema(&self) -> Result<()> {
499        // Create a memory operator from the included directory containing
500        // the engine's own migration scripts
501        let op = operator_from_includedir(&PROJECT_DIR, None)
502            .await
503            .context("Failed to create operator from included directory")?;
504
505        // Create a pinner and store to list and load migrations
506        let pinner = Latest::new("").context("Failed to create Latest pinner")?;
507        let pather = FolderPather {
508            spawn_folder: "".to_string(),
509        };
510        let store = Store::new(Box::new(pinner), op.clone(), pather)
511            .context("Failed to create store for update_schema")?;
512
513        // Get list of all available migrations (sorted oldest to newest)
514        let available_migrations = store
515            .list_migrations()
516            .await
517            .context("Failed to list migrations")?;
518
519        // Check if migration table exists to determine if this is bootstrap
520        let migration_table_exists = self
521            .migration_table_exists()
522            .await
523            .context("Failed checking if migration table exists")?;
524
525        // Get set of already applied migrations (empty if table doesn't exist)
526        let applied_migrations: HashSet<String> = if migration_table_exists {
527            self.get_applied_migrations_set(&self.safe_spawn_namespace())
528                .await
529                .context("Failed to get applied migrations set")?
530        } else {
531            HashSet::new()
532        };
533
534        // Create a config to use for generating using spawn templating
535        // engine.
536        let mut cfg = crate::config::Config::load("spawn.toml", &op, None)
537            .await
538            .context("Failed to load config for postgres psql")?;
539        let dbengtype = "psql".to_string();
540        cfg.database = Some(dbengtype.clone());
541        cfg.databases = HashMap::from([(dbengtype, self.db_config.clone())]);
542
543        // Apply each migration that hasn't been applied yet
544        for migration_path in available_migrations {
545            // Extract migration name from path (e.g., "migrations/001-base-migration-table/" -> "001-base-migration-table")
546            let migration_name = migration_path
547                .trim_end_matches('/')
548                .rsplit('/')
549                .next()
550                .unwrap_or(&migration_path);
551
552            // Skip if already applied
553            if applied_migrations.contains(migration_name) {
554                continue;
555            }
556
557            let migrator = crate::migrator::Migrator::new(&cfg, &migration_name, false);
558
559            // Load and render the migration
560            let variables = crate::variables::Variables::from_str(
561                "json",
562                &serde_json::json!({"schema": &self.spawn_schema}).to_string(),
563            )?;
564            let gen = migrator.generate_streaming(Some(variables)).await?;
565            let mut buffer = Vec::new();
566            gen.render_to_writer(&mut buffer)
567                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
568            let content = String::from_utf8(buffer)?;
569
570            // Apply the migration and record it
571            // Note: even for bootstrap, the first migration creates the tables,
572            // so they exist by the time we record the migration.
573            let write_fn: WriterFn =
574                Box::new(move |writer: &mut dyn Write| writer.write_all(content.as_bytes()));
575            match self
576                .apply_and_record_migration_v1(
577                    migration_name,
578                    write_fn,
579                    None, // pin_hash not used for engine migrations
580                    self.safe_spawn_namespace(),
581                    false, // no retry for internal schema migrations
582                )
583                .await
584            {
585                Ok(_) => {}
586                // For internal schema migrations, already applied is fine
587                Err(MigrationError::AlreadyApplied { .. }) => {}
588                // Other errors should propagate
589                Err(e) => return Err(e.into()),
590            }
591        }
592
593        Ok(())
594    }
595
596    /// Execute SQL and return stdout as a String.
597    /// Used for internal queries where we need to parse results.
598    async fn execute_sql(&self, query: &EscapedQuery, format: Option<&str>) -> Result<String> {
599        let query_str = query.as_str().to_string();
600        let format_owned = format.map(|s| s.to_string());
601
602        // Create a shared buffer to capture stdout
603        let stdout_buf = Arc::new(Mutex::new(Vec::new()));
604        let stdout_buf_clone = stdout_buf.clone();
605
606        self.execute_with_writer(
607            Box::new(move |writer| {
608                // Format settings if requested (QUIET is already set globally)
609                if let Some(fmt) = format_owned {
610                    writer.write_all(b"\\pset tuples_only on\n")?;
611                    writer.write_all(format!("\\pset format {}\n", fmt).as_bytes())?;
612                }
613                writer.write_all(query_str.as_bytes())?;
614                Ok(())
615            }),
616            Some(Box::new(SharedBufWriter(stdout_buf_clone))),
617            false, // Don't merge stderr for internal queries
618        )
619        .await
620        .map_err(|e| anyhow!("SQL execution failed: {}", e))?;
621
622        let buf = stdout_buf.lock().unwrap();
623        Ok(String::from_utf8_lossy(&buf).to_string())
624    }
625
626    async fn migration_table_exists(&self) -> Result<bool> {
627        self.table_exists("migration").await
628    }
629
630    async fn migration_history_table_exists(&self) -> Result<bool> {
631        self.table_exists("migration_history").await
632    }
633
634    async fn table_exists(&self, table_name: &str) -> Result<bool> {
635        let safe_table_name = EscapedLiteral::new(table_name);
636        // Use type-safe escaped types - escaping happens at construction time
637        let query = sql_query!(
638            r#"
639            SELECT EXISTS (
640                SELECT FROM information_schema.tables
641                WHERE table_schema = {}
642                AND table_name = {}
643            );
644            "#,
645            self.spawn_schema_literal(),
646            safe_table_name
647        );
648
649        let output = self.execute_sql(&query, Some("csv")).await?;
650        // With tuples_only mode, output is just "t" or "f"
651        Ok(output.trim() == "t")
652    }
653
654    async fn get_applied_migrations_set(
655        &self,
656        namespace: &EscapedLiteral,
657    ) -> Result<HashSet<String>> {
658        let query = sql_query!(
659            "SELECT name FROM {}.migration WHERE namespace = {};",
660            self.spawn_schema_ident(),
661            namespace,
662        );
663
664        let output = self.execute_sql(&query, Some("csv")).await?;
665        let mut migrations = HashSet::new();
666
667        // With tuples_only mode, we get just the data rows (no headers)
668        for line in output.lines() {
669            let name = line.trim();
670            if !name.is_empty() {
671                migrations.insert(name.to_string());
672            }
673        }
674
675        Ok(migrations)
676    }
677
678    /// Get the latest migration history entry for a given migration name and namespace.
679    /// Returns None if no history entry exists.
680    async fn get_migration_status(
681        &self,
682        migration_name: &str,
683        namespace: &EscapedLiteral,
684    ) -> Result<Option<ExistingMigrationInfo>> {
685        let safe_migration_name = EscapedLiteral::new(migration_name);
686        let query = sql_query!(
687            r#"
688            SELECT m.name, m.namespace, mh.status_id_status, mh.activity_id_activity, encode(mh.checksum, 'hex')
689            FROM {}.migration_history mh
690            JOIN {}.migration m ON mh.migration_id_migration = m.migration_id
691            WHERE m.name = {} AND m.namespace = {}
692            ORDER BY mh.migration_history_id DESC
693            LIMIT 1;
694            "#,
695            self.spawn_schema_ident(),
696            self.spawn_schema_ident(),
697            safe_migration_name,
698            namespace
699        );
700
701        let output = self.execute_sql(&query, Some("csv")).await?;
702
703        // With tuples_only mode, we get just the data row (no headers).
704        // Parse CSV: name,namespace,status_id_status,activity_id_activity,checksum
705        let data_line = output.trim();
706        if data_line.is_empty() {
707            return Ok(None);
708        }
709
710        let parts: Vec<&str> = data_line.split(',').collect();
711        if parts.len() < 5 {
712            return Ok(None);
713        }
714
715        let status = match parts[2].trim() {
716            "SUCCESS" => MigrationHistoryStatus::Success,
717            "ATTEMPTED" => MigrationHistoryStatus::Attempted,
718            "FAILURE" => MigrationHistoryStatus::Failure,
719            _ => return Ok(None),
720        };
721
722        Ok(Some(ExistingMigrationInfo {
723            migration_name: parts[0].trim().to_string(),
724            namespace: parts[1].trim().to_string(),
725            last_status: status,
726            last_activity: parts[3].trim().to_string(),
727            checksum: parts[4].trim().to_string(),
728        }))
729    }
730
731    /// Build the SQL query for recording a migration in the tracking tables.
732    /// Records a migration in the tracking tables using its own psql session.
733    /// Used when there is no existing writer (e.g., adopt).
734    async fn record_migration(
735        &self,
736        migration_name: &str,
737        namespace: &EscapedLiteral,
738        status: MigrationStatus,
739        activity: MigrationActivity,
740        checksum: Option<&str>,
741        execution_time: Option<f32>,
742        pin_hash: Option<&str>,
743        description: Option<&str>,
744    ) -> MigrationResult<()> {
745        let record_query = build_record_migration_sql(
746            &self.spawn_schema,
747            migration_name,
748            namespace,
749            status,
750            activity,
751            checksum,
752            execution_time,
753            pin_hash,
754            description,
755        );
756
757        self.execute_with_writer(
758            Box::new(move |writer| {
759                writer.write_all(record_query.as_str().as_bytes())?;
760                Ok(())
761            }),
762            None,
763            false, // Don't merge stderr for recording migrations
764        )
765        .await
766        .map_err(|e| match e {
767            EngineError::ExecutionFailed { exit_code, stderr } => {
768                MigrationError::Database(anyhow!(
769                    "Failed to record migration (exit {}): {}",
770                    exit_code,
771                    stderr
772                ))
773            }
774            EngineError::Io(e) => MigrationError::Database(e.into()),
775        })?;
776
777        Ok(())
778    }
779
780    // This is versioned because if we change the schema significantly enough
781    // later, we'll have to still write earlier migrations to the table using
782    // the format of the migration table as it is at that point.
783    async fn apply_and_record_migration_v1(
784        &self,
785        migration_name: &str,
786        write_fn: WriterFn,
787        pin_hash: Option<String>,
788        namespace: EscapedLiteral,
789        retry: bool,
790    ) -> MigrationResult<String> {
791        // Check if migration already exists in history (skip if table doesn't exist yet)
792        let existing_status = if self
793            .migration_history_table_exists()
794            .await
795            .map_err(MigrationError::Database)?
796        {
797            self.get_migration_status(migration_name, &namespace)
798                .await
799                .map_err(MigrationError::Database)?
800        } else {
801            None
802        };
803
804        if let Some(info) = existing_status {
805            if !retry {
806                let name = migration_name.to_string();
807                let ns = namespace.raw_value().to_string();
808
809                match info.last_status {
810                    MigrationHistoryStatus::Success => {
811                        return Err(MigrationError::AlreadyApplied {
812                            name,
813                            namespace: ns,
814                            info,
815                        });
816                    }
817                    MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {
818                        return Err(MigrationError::PreviousAttemptFailed {
819                            name,
820                            namespace: ns,
821                            status: info.last_status.clone(),
822                            info,
823                        });
824                    }
825                }
826            }
827        }
828
829        let start_time = Instant::now();
830        let lock_checksum = migration_lock_key();
831
832        // Use Arc<Mutex<>> to extract checksum from the closure
833        let checksum_result: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
834        let checksum_result_clone = checksum_result.clone();
835
836        // Session 1: Run the migration SQL only
837        let migration_result = self
838            .execute_with_writer(
839                Box::new(move |writer| {
840                    // Acquire advisory lock
841                    writer.write_all(
842                        format!(
843                            r#"DO $$ BEGIN IF NOT pg_try_advisory_lock({}) THEN RAISE EXCEPTION 'Could not acquire advisory lock'; END IF; END $$;"#,
844                            lock_checksum
845                        )
846                        .as_bytes(),
847                    )?;
848
849                    // Wrap the writer in a TeeWriter to compute checksum while streaming
850                    let mut tee_writer = TeeWriter::new(writer);
851
852                    // Execute the user's write function (streams migration SQL)
853                    write_fn(&mut tee_writer)?;
854
855                    // Extract the checksum
856                    let (_writer, content_checksum) = tee_writer.finish();
857                    let checksum_hex = format!("{:032x}", content_checksum);
858                    *checksum_result_clone.lock().unwrap() = Some(checksum_hex);
859
860                    Ok(())
861                }),
862                None,
863                false, // Don't merge stderr for migration apply
864            )
865            .await;
866
867        let duration = start_time.elapsed().as_secs_f32();
868        let checksum_hex = checksum_result.lock().unwrap().clone();
869
870        // Determine status based on session 1 result
871        let (status, migration_error) = match &migration_result {
872            Ok(()) => (MigrationStatus::Success, None),
873            Err(EngineError::ExecutionFailed { exit_code, stderr }) => {
874                if stderr.contains("Could not acquire advisory lock") {
875                    return Err(MigrationError::AdvisoryLock(std::io::Error::new(
876                        std::io::ErrorKind::Other,
877                        stderr.clone(),
878                    )));
879                }
880                (
881                    MigrationStatus::Failure,
882                    Some(format!("psql exited with code {}: {}", exit_code, stderr)),
883                )
884            }
885            Err(EngineError::Io(e)) => {
886                return Err(MigrationError::Database(anyhow!(
887                    "IO error running migration: {}",
888                    e
889                )));
890            }
891        };
892
893        // Session 2: Record the outcome (success or failure)
894        let record_result = self
895            .record_migration(
896                migration_name,
897                &namespace,
898                status,
899                MigrationActivity::Apply,
900                checksum_hex.as_deref(),
901                Some(duration),
902                pin_hash.as_deref(),
903                None,
904            )
905            .await;
906
907        // Handle recording failure
908        if let Err(record_err) = record_result {
909            // If migration succeeded but recording failed, that's the critical state
910            if migration_error.is_none() {
911                return Err(MigrationError::NotRecorded {
912                    name: migration_name.to_string(),
913                    migration_outcome: MigrationStatus::Success,
914                    migration_error: None,
915                    recording_error: format!("{}", record_err),
916                });
917            }
918            // Both migration and recording failed
919            return Err(MigrationError::NotRecorded {
920                name: migration_name.to_string(),
921                migration_outcome: MigrationStatus::Failure,
922                migration_error: migration_error.clone(),
923                recording_error: format!("{}", record_err),
924            });
925        }
926
927        // If the migration itself failed (but was recorded), return that error
928        if let Some(err_msg) = migration_error {
929            return Err(MigrationError::Database(anyhow!(
930                "Migration '{}' failed: {}",
931                migration_name,
932                err_msg
933            )));
934        }
935
936        Ok("Migration applied successfully".to_string())
937    }
938}