Skip to main content

spawn_db/engine/
postgres_psql.rs

1// This is a driver that uses a locally provided PSQL command to execute
2// scripts, which enables user's scripts to take advantage of things like the
3// build in PSQL helper commands.
4
5use crate::config::FolderPather;
6use crate::engine::{
7    resolve_command_spec, DatabaseConfig, Engine, EngineError, ExistingMigrationInfo,
8    MigrationActivity, MigrationError, MigrationHistoryStatus, MigrationResult, MigrationStatus,
9    StdoutWriter, WriterFn,
10};
11use crate::escape::{EscapedIdentifier, EscapedLiteral, EscapedQuery, InsecureRawSql};
12use crate::sql_query;
13use crate::store::pinner::latest::Latest;
14use crate::store::{operator_from_includedir, Store};
15use anyhow::{anyhow, Context, Result};
16use async_trait::async_trait;
17use include_dir::{include_dir, Dir};
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::io::Write;
21use std::process::Stdio;
22use std::sync::{Arc, Mutex};
23use std::time::Instant;
24use tokio::io::AsyncReadExt;
25use tokio::process::Command;
26use twox_hash::xxhash3_128;
27use twox_hash::XxHash64;
28
29/// Returns the advisory lock key used to prevent concurrent migrations.
30/// This is computed as XxHash64 of "SPAWN_MIGRATION_LOCK" with seed 1234, cast to i64.
31pub fn migration_lock_key() -> i64 {
32    XxHash64::oneshot(1234, "SPAWN_MIGRATION_LOCK".as_bytes()) as i64
33}
34
35#[derive(Debug)]
36pub struct PSQL {
37    psql_command: Vec<String>,
38    /// Schema name as an escaped identifier (for use as schema.table)
39    spawn_schema: String,
40    db_config: DatabaseConfig,
41}
42
43static PROJECT_DIR: Dir<'_> = include_dir!("./static/engine-migrations/postgres-psql");
44static SPAWN_NAMESPACE: &str = "spawn";
45
46impl PSQL {
47    pub async fn new(config: &DatabaseConfig) -> Result<Box<dyn Engine>> {
48        let command_spec = config
49            .command
50            .clone()
51            .ok_or(anyhow!("Command for database config must be defined"))?;
52
53        let psql_command = resolve_command_spec(command_spec).await?;
54
55        let eng = Box::new(Self {
56            psql_command,
57            spawn_schema: config.spawn_schema.clone(),
58            db_config: config.clone(),
59        });
60
61        // Ensure we have latest schema:
62        eng.update_schema()
63            .await
64            .map_err(MigrationError::Database)?;
65
66        Ok(eng)
67    }
68
69    fn spawn_schema_literal(&self) -> EscapedLiteral {
70        EscapedLiteral::new(&self.spawn_schema)
71    }
72
73    fn spawn_schema_ident(&self) -> EscapedIdentifier {
74        EscapedIdentifier::new(&self.spawn_schema)
75    }
76
77    fn safe_spawn_namespace(&self) -> EscapedLiteral {
78        EscapedLiteral::new(SPAWN_NAMESPACE)
79    }
80}
81
82#[async_trait]
83impl Engine for PSQL {
84    async fn execute_with_writer(
85        &self,
86        write_fn: WriterFn,
87        stdout_writer: StdoutWriter,
88    ) -> Result<(), EngineError> {
89        // 1. Create the pipe for stdin
90        let (reader, mut writer) = std::io::pipe()?;
91
92        // 2. Configure stdout based on whether we have a writer
93        let stdout_config = if stdout_writer.is_some() {
94            Stdio::piped()
95        } else {
96            Stdio::null()
97        };
98
99        // 3. Spawn psql reading from the pipe
100        let mut child = Command::new(&self.psql_command[0])
101            .args(&self.psql_command[1..])
102            .stdin(Stdio::from(reader))
103            .stdout(stdout_config)
104            .stderr(Stdio::piped())
105            .spawn()
106            .map_err(EngineError::Io)?;
107
108        // 4. If we have a stdout writer, spawn task to copy stdout to it
109        let stdout_handle = if let Some(mut stdout_dest) = stdout_writer {
110            let mut stdout = child.stdout.take().expect("stdout should be piped");
111            Some(tokio::spawn(async move {
112                use tokio::io::AsyncWriteExt;
113                let mut buf = Vec::new();
114                let _ = stdout.read_to_end(&mut buf).await;
115                let _ = stdout_dest.write_all(&buf).await;
116            }))
117        } else {
118            None
119        };
120
121        // 5. Drain stderr in background (always, prevents deadlock)
122        let mut stderr = child.stderr.take().expect("stderr should be piped");
123        let stderr_handle = tokio::spawn(async move {
124            let mut buf = Vec::new();
125            let _ = stderr.read_to_end(&mut buf).await;
126            buf
127        });
128
129        // 6. Run the writer function in a blocking thread
130        let writer_handle = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
131            // PSQL-specific setup - QUIET must be first to suppress output from other settings
132            writer.write_all(b"\\set QUIET on\n")?;
133            writer.write_all(b"\\pset pager off\n")?;
134            writer.write_all(b"\\set ON_ERROR_STOP on\n")?;
135
136            // User's write function (template rendering, etc.)
137            write_fn(&mut writer)?;
138
139            // Writer dropped here -> EOF to psql
140            Ok(())
141        });
142
143        // 7. Wait for writing to complete
144        writer_handle
145            .await
146            .map_err(|e| EngineError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??;
147
148        // 8. Wait for stdout copy if applicable (must complete before we read the buffer)
149        if let Some(handle) = stdout_handle {
150            // Wait for both the async read and the blocking write to complete
151            let _ = handle.await;
152        }
153
154        // 9. Wait for psql and check result
155        let status = child.wait().await?;
156        let stderr_bytes = stderr_handle.await.unwrap_or_default();
157
158        if !status.success() {
159            return Err(EngineError::ExecutionFailed {
160                exit_code: status.code().unwrap_or(-1),
161                stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
162            });
163        }
164
165        Ok(())
166    }
167
168    async fn migration_apply(
169        &self,
170        migration_name: &str,
171        write_fn: WriterFn,
172        pin_hash: Option<String>,
173        namespace: &str,
174        retry: bool,
175    ) -> MigrationResult<String> {
176        self.apply_and_record_migration_v1(
177            migration_name,
178            write_fn,
179            pin_hash,
180            EscapedLiteral::new(namespace),
181            retry,
182        )
183        .await
184    }
185
186    async fn migration_adopt(
187        &self,
188        migration_name: &str,
189        namespace: &str,
190        description: &str,
191    ) -> MigrationResult<String> {
192        let namespace_lit = EscapedLiteral::new(namespace);
193
194        // Check if migration already exists in history
195        let existing_status = self
196            .get_migration_status(migration_name, &namespace_lit)
197            .await
198            .map_err(MigrationError::Database)?;
199
200        if let Some(info) = existing_status {
201            let name = migration_name.to_string();
202            let ns = namespace_lit.raw_value().to_string();
203
204            match info.last_status {
205                MigrationHistoryStatus::Success => {
206                    return Err(MigrationError::AlreadyApplied {
207                        name,
208                        namespace: ns,
209                        info,
210                    });
211                }
212                // Allow adopting migrations that previously failed or were attempted.
213                // This is one of the ways to resolve: fix manually and mark as adopted.
214                MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {}
215            }
216        }
217
218        // Record the migration with SUCCESS status, ADOPT activity, empty checksum
219        self.record_migration(
220            migration_name,
221            &namespace_lit,
222            MigrationStatus::Success,
223            MigrationActivity::Adopt,
224            None, // empty checksum
225            None, // no execution time
226            None, // no pin_hash
227            Some(description),
228        )
229        .await?;
230
231        Ok(format!(
232            "Migration '{}' adopted successfully",
233            migration_name
234        ))
235    }
236
237    async fn get_migrations_from_db(
238        &self,
239        namespace: Option<&str>,
240    ) -> MigrationResult<Vec<crate::engine::MigrationDbInfo>> {
241        use serde::Deserialize;
242
243        // Build the query with optional namespace filter
244        let namespace_lit = namespace.map(|ns| EscapedLiteral::new(ns));
245        let query = sql_query!(
246            r#"
247            SELECT json_agg(row_to_json(t))
248            FROM (
249                SELECT DISTINCT ON (m.name)
250                    m.name as migration_name,
251                    mh.status_id_status as last_status,
252                    mh.activity_id_activity as last_activity,
253                    encode(mh.checksum, 'hex') as checksum
254                FROM {}.migration m
255                LEFT JOIN {}.migration_history mh ON m.migration_id = mh.migration_id_migration
256                WHERE {} IS NULL OR m.namespace = {}
257                ORDER BY m.name, mh.created_at DESC NULLS LAST
258            ) t
259            "#,
260            self.spawn_schema_ident(),
261            self.spawn_schema_ident(),
262            namespace_lit,
263            namespace_lit
264        );
265
266        let output = self
267            .execute_sql(&query, Some("unaligned"))
268            .await
269            .map_err(MigrationError::Database)?;
270
271        // Define a struct for JSON deserialization
272        #[derive(Deserialize)]
273        struct MigrationRow {
274            migration_name: String,
275            last_status: Option<String>,
276            last_activity: Option<String>,
277            checksum: Option<String>,
278        }
279
280        // Parse the JSON output
281        let json_str = output.trim();
282
283        // Handle case where there are no migrations (json_agg returns null)
284        if json_str == "null" || json_str.is_empty() {
285            return Ok(Vec::new());
286        }
287
288        let rows: Vec<MigrationRow> = serde_json::from_str(json_str).map_err(|e| {
289            MigrationError::Database(anyhow::anyhow!(
290                "Failed to parse JSON from database (output: '{}'): {}",
291                json_str,
292                e
293            ))
294        })?;
295
296        // Convert to MigrationDbInfo
297        let mut results: Vec<crate::engine::MigrationDbInfo> = rows
298            .into_iter()
299            .map(|row| {
300                let status = row
301                    .last_status
302                    .as_deref()
303                    .and_then(MigrationHistoryStatus::from_str);
304
305                crate::engine::MigrationDbInfo {
306                    migration_name: row.migration_name,
307                    last_status: status,
308                    last_activity: row.last_activity,
309                    checksum: row.checksum,
310                }
311            })
312            .collect();
313
314        // Sort by migration name for consistent output
315        results.sort_by(|a, b| a.migration_name.cmp(&b.migration_name));
316
317        Ok(results)
318    }
319}
320
321/// A simple AsyncWrite implementation that appends to a shared Vec<u8>
322struct SharedBufWriter(Arc<Mutex<Vec<u8>>>);
323
324impl tokio::io::AsyncWrite for SharedBufWriter {
325    fn poll_write(
326        self: std::pin::Pin<&mut Self>,
327        _cx: &mut std::task::Context<'_>,
328        buf: &[u8],
329    ) -> std::task::Poll<std::io::Result<usize>> {
330        self.0.lock().unwrap().extend_from_slice(buf);
331        std::task::Poll::Ready(Ok(buf.len()))
332    }
333
334    fn poll_flush(
335        self: std::pin::Pin<&mut Self>,
336        _cx: &mut std::task::Context<'_>,
337    ) -> std::task::Poll<std::io::Result<()>> {
338        std::task::Poll::Ready(Ok(()))
339    }
340
341    fn poll_shutdown(
342        self: std::pin::Pin<&mut Self>,
343        _cx: &mut std::task::Context<'_>,
344    ) -> std::task::Poll<std::io::Result<()>> {
345        std::task::Poll::Ready(Ok(()))
346    }
347}
348
349/// A writer that tees output to both an inner writer and a hasher for checksum calculation.
350/// This allows streaming the migration SQL while computing the checksum on-the-fly.
351struct TeeWriter<W: Write> {
352    inner: W,
353    hasher: xxhash3_128::Hasher,
354}
355
356impl<W: Write> TeeWriter<W> {
357    fn new(inner: W) -> Self {
358        Self {
359            inner,
360            hasher: xxhash3_128::Hasher::new(),
361        }
362    }
363
364    fn finish(self) -> (W, u128) {
365        (self.inner, self.hasher.finish_128())
366    }
367}
368
369impl<W: Write> Write for TeeWriter<W> {
370    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
371        self.hasher.write(buf);
372        self.inner.write(buf)
373    }
374
375    fn flush(&mut self) -> std::io::Result<()> {
376        self.inner.flush()
377    }
378}
379
380/// Free function to build the SQL query for recording a migration.
381/// This can be used from both async and sync contexts.
382fn build_record_migration_sql(
383    spawn_schema: &str,
384    migration_name: &str,
385    namespace: &EscapedLiteral,
386    status: MigrationStatus,
387    activity: MigrationActivity,
388    checksum: Option<&str>,
389    execution_time: Option<f32>,
390    pin_hash: Option<&str>,
391    description: Option<&str>,
392) -> EscapedQuery {
393    let schema_ident = EscapedIdentifier::new(spawn_schema);
394    let safe_migration_name = EscapedLiteral::new(migration_name);
395    let safe_status = EscapedLiteral::new(status.as_str());
396    let safe_activity = EscapedLiteral::new(activity.as_str());
397    let safe_description = EscapedLiteral::new(description.unwrap_or(""));
398    // If no checksum provided, use empty bytea (decode returns empty bytea for empty string)
399    let checksum_expr = checksum
400        .map(|c| format!("decode('{}', 'hex')", c))
401        .unwrap_or_else(|| "decode('', 'hex')".to_string());
402    let checksum_raw = InsecureRawSql::new(&checksum_expr);
403    let safe_pin_hash = pin_hash.map(|h| EscapedLiteral::new(h));
404
405    let duration_interval = execution_time
406        .map(|d| InsecureRawSql::new(&format!("INTERVAL '{} second'", d)))
407        .unwrap_or_else(|| InsecureRawSql::new("INTERVAL '0 second'"));
408
409    sql_query!(
410        r#"
411BEGIN;
412WITH inserted_migration AS (
413    INSERT INTO {}.migration (name, namespace) VALUES ({}, {})
414    ON CONFLICT (name, namespace) DO UPDATE SET name = EXCLUDED.name
415    RETURNING migration_id
416)
417INSERT INTO {}.migration_history (
418    migration_id_migration,
419    activity_id_activity,
420    created_by,
421    description,
422    status_note,
423    status_id_status,
424    checksum,
425    execution_time,
426    pin_hash
427)
428SELECT
429    migration_id,
430    {},
431    'unused',
432    {},
433    '',
434    {},
435    {},
436    {},
437    {}
438FROM inserted_migration;
439COMMIT;
440"#,
441        schema_ident,
442        safe_migration_name,
443        namespace,
444        schema_ident,
445        safe_activity,
446        safe_description,
447        safe_status,
448        checksum_raw,
449        duration_interval,
450        safe_pin_hash,
451    )
452}
453
454impl PSQL {
455    pub async fn update_schema(&self) -> Result<()> {
456        // Create a memory operator from the included directory containing
457        // the engine's own migration scripts
458        let op = operator_from_includedir(&PROJECT_DIR, None)
459            .await
460            .context("Failed to create operator from included directory")?;
461
462        // Create a pinner and store to list and load migrations
463        let pinner = Latest::new("").context("Failed to create Latest pinner")?;
464        let pather = FolderPather {
465            spawn_folder: "".to_string(),
466        };
467        let store = Store::new(Box::new(pinner), op.clone(), pather)
468            .context("Failed to create store for update_schema")?;
469
470        // Get list of all available migrations (sorted oldest to newest)
471        let available_migrations = store
472            .list_migrations()
473            .await
474            .context("Failed to list migrations")?;
475
476        // Check if migration table exists to determine if this is bootstrap
477        let migration_table_exists = self
478            .migration_table_exists()
479            .await
480            .context("Failed checking if migration table exists")?;
481
482        // Get set of already applied migrations (empty if table doesn't exist)
483        let applied_migrations: HashSet<String> = if migration_table_exists {
484            self.get_applied_migrations_set(&self.safe_spawn_namespace())
485                .await
486                .context("Failed to get applied migrations set")?
487        } else {
488            HashSet::new()
489        };
490
491        // Create a config to use for generating using spawn templating
492        // engine.
493        let mut cfg = crate::config::Config::load("spawn.toml", &op, None)
494            .await
495            .context("Failed to load config for postgres psql")?;
496        let dbengtype = "psql".to_string();
497        cfg.database = Some(dbengtype.clone());
498        cfg.databases = HashMap::from([(dbengtype, self.db_config.clone())]);
499
500        // Apply each migration that hasn't been applied yet
501        for migration_path in available_migrations {
502            // Extract migration name from path (e.g., "migrations/001-base-migration-table/" -> "001-base-migration-table")
503            let migration_name = migration_path
504                .trim_end_matches('/')
505                .rsplit('/')
506                .next()
507                .unwrap_or(&migration_path);
508
509            // Skip if already applied
510            if applied_migrations.contains(migration_name) {
511                continue;
512            }
513
514            let migrator = crate::migrator::Migrator::new(&cfg, &migration_name, false);
515
516            // Load and render the migration
517            let variables = crate::variables::Variables::from_str(
518                "json",
519                &serde_json::json!({"schema": &self.spawn_schema}).to_string(),
520            )?;
521            let gen = migrator.generate_streaming(Some(variables)).await?;
522            let mut buffer = Vec::new();
523            gen.render_to_writer(&mut buffer)
524                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
525            let content = String::from_utf8(buffer)?;
526
527            // Apply the migration and record it
528            // Note: even for bootstrap, the first migration creates the tables,
529            // so they exist by the time we record the migration.
530            let write_fn: WriterFn =
531                Box::new(move |writer: &mut dyn Write| writer.write_all(content.as_bytes()));
532            match self
533                .apply_and_record_migration_v1(
534                    migration_name,
535                    write_fn,
536                    None, // pin_hash not used for engine migrations
537                    self.safe_spawn_namespace(),
538                    false, // no retry for internal schema migrations
539                )
540                .await
541            {
542                Ok(_) => {}
543                // For internal schema migrations, already applied is fine
544                Err(MigrationError::AlreadyApplied { .. }) => {}
545                // Other errors should propagate
546                Err(e) => return Err(e.into()),
547            }
548        }
549
550        Ok(())
551    }
552
553    /// Execute SQL and return stdout as a String.
554    /// Used for internal queries where we need to parse results.
555    async fn execute_sql(&self, query: &EscapedQuery, format: Option<&str>) -> Result<String> {
556        let query_str = query.as_str().to_string();
557        let format_owned = format.map(|s| s.to_string());
558
559        // Create a shared buffer to capture stdout
560        let stdout_buf = Arc::new(Mutex::new(Vec::new()));
561        let stdout_buf_clone = stdout_buf.clone();
562
563        self.execute_with_writer(
564            Box::new(move |writer| {
565                // Format settings if requested (QUIET is already set globally)
566                if let Some(fmt) = format_owned {
567                    writer.write_all(b"\\pset tuples_only on\n")?;
568                    writer.write_all(format!("\\pset format {}\n", fmt).as_bytes())?;
569                }
570                writer.write_all(query_str.as_bytes())?;
571                Ok(())
572            }),
573            Some(Box::new(SharedBufWriter(stdout_buf_clone))),
574        )
575        .await
576        .map_err(|e| anyhow!("SQL execution failed: {}", e))?;
577
578        let buf = stdout_buf.lock().unwrap();
579        Ok(String::from_utf8_lossy(&buf).to_string())
580    }
581
582    async fn migration_table_exists(&self) -> Result<bool> {
583        self.table_exists("migration").await
584    }
585
586    async fn migration_history_table_exists(&self) -> Result<bool> {
587        self.table_exists("migration_history").await
588    }
589
590    async fn table_exists(&self, table_name: &str) -> Result<bool> {
591        let safe_table_name = EscapedLiteral::new(table_name);
592        // Use type-safe escaped types - escaping happens at construction time
593        let query = sql_query!(
594            r#"
595            SELECT EXISTS (
596                SELECT FROM information_schema.tables
597                WHERE table_schema = {}
598                AND table_name = {}
599            );
600            "#,
601            self.spawn_schema_literal(),
602            safe_table_name
603        );
604
605        let output = self.execute_sql(&query, Some("csv")).await?;
606        // With tuples_only mode, output is just "t" or "f"
607        Ok(output.trim() == "t")
608    }
609
610    async fn get_applied_migrations_set(
611        &self,
612        namespace: &EscapedLiteral,
613    ) -> Result<HashSet<String>> {
614        let query = sql_query!(
615            "SELECT name FROM {}.migration WHERE namespace = {};",
616            self.spawn_schema_ident(),
617            namespace,
618        );
619
620        let output = self.execute_sql(&query, Some("csv")).await?;
621        let mut migrations = HashSet::new();
622
623        // With tuples_only mode, we get just the data rows (no headers)
624        for line in output.lines() {
625            let name = line.trim();
626            if !name.is_empty() {
627                migrations.insert(name.to_string());
628            }
629        }
630
631        Ok(migrations)
632    }
633
634    /// Get the latest migration history entry for a given migration name and namespace.
635    /// Returns None if no history entry exists.
636    async fn get_migration_status(
637        &self,
638        migration_name: &str,
639        namespace: &EscapedLiteral,
640    ) -> Result<Option<ExistingMigrationInfo>> {
641        let safe_migration_name = EscapedLiteral::new(migration_name);
642        let query = sql_query!(
643            r#"
644            SELECT m.name, m.namespace, mh.status_id_status, mh.activity_id_activity, encode(mh.checksum, 'hex')
645            FROM {}.migration_history mh
646            JOIN {}.migration m ON mh.migration_id_migration = m.migration_id
647            WHERE m.name = {} AND m.namespace = {}
648            ORDER BY mh.migration_history_id DESC
649            LIMIT 1;
650            "#,
651            self.spawn_schema_ident(),
652            self.spawn_schema_ident(),
653            safe_migration_name,
654            namespace
655        );
656
657        let output = self.execute_sql(&query, Some("csv")).await?;
658
659        // With tuples_only mode, we get just the data row (no headers).
660        // Parse CSV: name,namespace,status_id_status,activity_id_activity,checksum
661        let data_line = output.trim();
662        if data_line.is_empty() {
663            return Ok(None);
664        }
665
666        let parts: Vec<&str> = data_line.split(',').collect();
667        if parts.len() < 5 {
668            return Ok(None);
669        }
670
671        let status = match parts[2].trim() {
672            "SUCCESS" => MigrationHistoryStatus::Success,
673            "ATTEMPTED" => MigrationHistoryStatus::Attempted,
674            "FAILURE" => MigrationHistoryStatus::Failure,
675            _ => return Ok(None),
676        };
677
678        Ok(Some(ExistingMigrationInfo {
679            migration_name: parts[0].trim().to_string(),
680            namespace: parts[1].trim().to_string(),
681            last_status: status,
682            last_activity: parts[3].trim().to_string(),
683            checksum: parts[4].trim().to_string(),
684        }))
685    }
686
687    /// Build the SQL query for recording a migration in the tracking tables.
688    /// Records a migration in the tracking tables using its own psql session.
689    /// Used when there is no existing writer (e.g., adopt).
690    async fn record_migration(
691        &self,
692        migration_name: &str,
693        namespace: &EscapedLiteral,
694        status: MigrationStatus,
695        activity: MigrationActivity,
696        checksum: Option<&str>,
697        execution_time: Option<f32>,
698        pin_hash: Option<&str>,
699        description: Option<&str>,
700    ) -> MigrationResult<()> {
701        let record_query = build_record_migration_sql(
702            &self.spawn_schema,
703            migration_name,
704            namespace,
705            status,
706            activity,
707            checksum,
708            execution_time,
709            pin_hash,
710            description,
711        );
712
713        self.execute_with_writer(
714            Box::new(move |writer| {
715                writer.write_all(record_query.as_str().as_bytes())?;
716                Ok(())
717            }),
718            None,
719        )
720        .await
721        .map_err(|e| match e {
722            EngineError::ExecutionFailed { exit_code, stderr } => {
723                MigrationError::Database(anyhow!(
724                    "Failed to record migration (exit {}): {}",
725                    exit_code,
726                    stderr
727                ))
728            }
729            EngineError::Io(e) => MigrationError::Database(e.into()),
730        })?;
731
732        Ok(())
733    }
734
735    // This is versioned because if we change the schema significantly enough
736    // later, we'll have to still write earlier migrations to the table using
737    // the format of the migration table as it is at that point.
738    async fn apply_and_record_migration_v1(
739        &self,
740        migration_name: &str,
741        write_fn: WriterFn,
742        pin_hash: Option<String>,
743        namespace: EscapedLiteral,
744        retry: bool,
745    ) -> MigrationResult<String> {
746        // Check if migration already exists in history (skip if table doesn't exist yet)
747        let existing_status = if self
748            .migration_history_table_exists()
749            .await
750            .map_err(MigrationError::Database)?
751        {
752            self.get_migration_status(migration_name, &namespace)
753                .await
754                .map_err(MigrationError::Database)?
755        } else {
756            None
757        };
758
759        if let Some(info) = existing_status {
760            let name = migration_name.to_string();
761            let ns = namespace.raw_value().to_string();
762
763            match info.last_status {
764                MigrationHistoryStatus::Success => {
765                    return Err(MigrationError::AlreadyApplied {
766                        name,
767                        namespace: ns,
768                        info,
769                    });
770                }
771                MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {
772                    if !retry {
773                        return Err(MigrationError::PreviousAttemptFailed {
774                            name,
775                            namespace: ns,
776                            status: info.last_status.clone(),
777                            info,
778                        });
779                    }
780                }
781            }
782        }
783
784        let start_time = Instant::now();
785        let lock_checksum = migration_lock_key();
786
787        // Use Arc<Mutex<>> to extract checksum from the closure
788        let checksum_result: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
789        let checksum_result_clone = checksum_result.clone();
790
791        // Session 1: Run the migration SQL only
792        let migration_result = self
793            .execute_with_writer(
794                Box::new(move |writer| {
795                    // Acquire advisory lock
796                    writer.write_all(
797                        format!(
798                            r#"DO $$ BEGIN IF NOT pg_try_advisory_lock({}) THEN RAISE EXCEPTION 'Could not acquire advisory lock'; END IF; END $$;"#,
799                            lock_checksum
800                        )
801                        .as_bytes(),
802                    )?;
803
804                    // Wrap the writer in a TeeWriter to compute checksum while streaming
805                    let mut tee_writer = TeeWriter::new(writer);
806
807                    // Execute the user's write function (streams migration SQL)
808                    write_fn(&mut tee_writer)?;
809
810                    // Extract the checksum
811                    let (_writer, content_checksum) = tee_writer.finish();
812                    let checksum_hex = format!("{:032x}", content_checksum);
813                    *checksum_result_clone.lock().unwrap() = Some(checksum_hex);
814
815                    Ok(())
816                }),
817                None,
818            )
819            .await;
820
821        let duration = start_time.elapsed().as_secs_f32();
822        let checksum_hex = checksum_result.lock().unwrap().clone();
823
824        // Determine status based on session 1 result
825        let (status, migration_error) = match &migration_result {
826            Ok(()) => (MigrationStatus::Success, None),
827            Err(EngineError::ExecutionFailed { exit_code, stderr }) => {
828                if stderr.contains("Could not acquire advisory lock") {
829                    return Err(MigrationError::AdvisoryLock(std::io::Error::new(
830                        std::io::ErrorKind::Other,
831                        stderr.clone(),
832                    )));
833                }
834                (
835                    MigrationStatus::Failure,
836                    Some(format!("psql exited with code {}: {}", exit_code, stderr)),
837                )
838            }
839            Err(EngineError::Io(e)) => {
840                return Err(MigrationError::Database(anyhow!(
841                    "IO error running migration: {}",
842                    e
843                )));
844            }
845        };
846
847        // Session 2: Record the outcome (success or failure)
848        let record_result = self
849            .record_migration(
850                migration_name,
851                &namespace,
852                status,
853                MigrationActivity::Apply,
854                checksum_hex.as_deref(),
855                Some(duration),
856                pin_hash.as_deref(),
857                None,
858            )
859            .await;
860
861        // Handle recording failure
862        if let Err(record_err) = record_result {
863            // If migration succeeded but recording failed, that's the critical state
864            if migration_error.is_none() {
865                return Err(MigrationError::NotRecorded {
866                    name: migration_name.to_string(),
867                    migration_outcome: MigrationStatus::Success,
868                    migration_error: None,
869                    recording_error: format!("{}", record_err),
870                });
871            }
872            // Both migration and recording failed
873            return Err(MigrationError::NotRecorded {
874                name: migration_name.to_string(),
875                migration_outcome: MigrationStatus::Failure,
876                migration_error: migration_error.clone(),
877                recording_error: format!("{}", record_err),
878            });
879        }
880
881        // If the migration itself failed (but was recorded), return that error
882        if let Some(err_msg) = migration_error {
883            return Err(MigrationError::Database(anyhow!(
884                "Migration '{}' failed: {}",
885                migration_name,
886                err_msg
887            )));
888        }
889
890        Ok("Migration applied successfully".to_string())
891    }
892}