1use crate::config::FolderPather;
6use crate::engine::{
7 resolve_command_spec, DatabaseConfig, Engine, EngineError, ExistingMigrationInfo,
8 MigrationActivity, MigrationError, MigrationHistoryStatus, MigrationResult, MigrationStatus,
9 StdoutWriter, WriterFn,
10};
11use crate::escape::{EscapedIdentifier, EscapedLiteral, EscapedQuery, InsecureRawSql};
12use crate::sql_query;
13use crate::store::pinner::latest::Latest;
14use crate::store::{operator_from_includedir, Store};
15use anyhow::{anyhow, Context, Result};
16use async_trait::async_trait;
17use include_dir::{include_dir, Dir};
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::io::Write;
21use std::process::Stdio;
22use std::sync::{Arc, Mutex};
23use std::time::Instant;
24use tokio::io::AsyncReadExt;
25use tokio::process::Command;
26use twox_hash::xxhash3_128;
27use twox_hash::XxHash64;
28
29pub fn migration_lock_key() -> i64 {
32 XxHash64::oneshot(1234, "SPAWN_MIGRATION_LOCK".as_bytes()) as i64
33}
34
35#[derive(Debug)]
36pub struct PSQL {
37 psql_command: Vec<String>,
38 spawn_schema: String,
40 db_config: DatabaseConfig,
41}
42
43static PROJECT_DIR: Dir<'_> = include_dir!("./static/engine-migrations/postgres-psql");
44static SPAWN_NAMESPACE: &str = "spawn";
45
46impl PSQL {
47 pub async fn new(config: &DatabaseConfig) -> Result<Box<dyn Engine>> {
48 let command_spec = config
49 .command
50 .clone()
51 .ok_or(anyhow!("Command for database config must be defined"))?;
52
53 let psql_command = resolve_command_spec(command_spec).await?;
54
55 let eng = Box::new(Self {
56 psql_command,
57 spawn_schema: config.spawn_schema.clone(),
58 db_config: config.clone(),
59 });
60
61 eng.update_schema()
63 .await
64 .map_err(MigrationError::Database)?;
65
66 Ok(eng)
67 }
68
69 fn spawn_schema_literal(&self) -> EscapedLiteral {
70 EscapedLiteral::new(&self.spawn_schema)
71 }
72
73 fn spawn_schema_ident(&self) -> EscapedIdentifier {
74 EscapedIdentifier::new(&self.spawn_schema)
75 }
76
77 fn safe_spawn_namespace(&self) -> EscapedLiteral {
78 EscapedLiteral::new(SPAWN_NAMESPACE)
79 }
80}
81
82#[async_trait]
83impl Engine for PSQL {
84 async fn execute_with_writer(
85 &self,
86 write_fn: WriterFn,
87 stdout_writer: StdoutWriter,
88 merge_stderr: bool,
89 ) -> Result<(), EngineError> {
90 let (reader, mut writer) = std::io::pipe()?;
92
93 let merge = merge_stderr && stdout_writer.is_some();
99
100 let (mut child, combined_read) = if merge {
101 let (out_read, out_write) = std::io::pipe()?;
102 let out_write_dup = out_write.try_clone()?;
103 let child = Command::new(&self.psql_command[0])
104 .args(&self.psql_command[1..])
105 .stdin(Stdio::from(reader))
106 .stdout(Stdio::from(out_write))
107 .stderr(Stdio::from(out_write_dup))
108 .spawn()
109 .map_err(EngineError::Io)?;
110 (child, Some(out_read))
111 } else {
112 let stdout_config = if stdout_writer.is_some() {
113 Stdio::piped()
114 } else {
115 Stdio::null()
116 };
117 let child = Command::new(&self.psql_command[0])
118 .args(&self.psql_command[1..])
119 .stdin(Stdio::from(reader))
120 .stdout(stdout_config)
121 .stderr(Stdio::piped())
122 .spawn()
123 .map_err(EngineError::Io)?;
124 (child, None)
125 };
126
127 let stdout_handle = if let Some(mut stdout_dest) = stdout_writer {
129 if let Some(mut combined_read) = combined_read {
130 Some(tokio::task::spawn(async move {
134 let buf = tokio::task::spawn_blocking(move || {
135 use std::io::Read;
136 let mut buf = Vec::new();
137 let _ = combined_read.read_to_end(&mut buf);
138 buf
139 })
140 .await
141 .unwrap_or_default();
142 use tokio::io::AsyncWriteExt;
143 let _ = stdout_dest.write_all(&buf).await;
144 }))
145 } else {
146 let mut stdout = child.stdout.take().expect("stdout should be piped");
147 Some(tokio::task::spawn(async move {
148 use tokio::io::AsyncWriteExt;
149 let mut buf = Vec::new();
150 let _ = stdout.read_to_end(&mut buf).await;
151 let _ = stdout_dest.write_all(&buf).await;
152 }))
153 }
154 } else {
155 None
156 };
157
158 let stderr_handle = if let Some(mut stderr) = child.stderr.take() {
161 Some(tokio::spawn(async move {
162 let mut buf = Vec::new();
163 let _ = stderr.read_to_end(&mut buf).await;
164 buf
165 }))
166 } else {
167 None
168 };
169
170 let writer_handle = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
172 writer.write_all(b"\\set QUIET on\n")?;
174 writer.write_all(b"\\pset pager off\n")?;
175 writer.write_all(b"\\set ON_ERROR_STOP on\n")?;
176
177 write_fn(&mut writer)?;
179
180 Ok(())
182 });
183
184 writer_handle
186 .await
187 .map_err(|e| EngineError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??;
188
189 if let Some(handle) = stdout_handle {
191 let _ = handle.await;
192 }
193
194 let status = child.wait().await?;
196 let stderr_bytes = match stderr_handle {
197 Some(handle) => handle.await.unwrap_or_default(),
198 None => Vec::new(),
199 };
200
201 if !status.success() {
202 return Err(EngineError::ExecutionFailed {
203 exit_code: status.code().unwrap_or(-1),
204 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
205 });
206 }
207
208 Ok(())
209 }
210
211 async fn migration_apply(
212 &self,
213 migration_name: &str,
214 write_fn: WriterFn,
215 pin_hash: Option<String>,
216 namespace: &str,
217 retry: bool,
218 ) -> MigrationResult<String> {
219 self.apply_and_record_migration_v1(
220 migration_name,
221 write_fn,
222 pin_hash,
223 EscapedLiteral::new(namespace),
224 retry,
225 )
226 .await
227 }
228
229 async fn migration_adopt(
230 &self,
231 migration_name: &str,
232 namespace: &str,
233 description: &str,
234 ) -> MigrationResult<String> {
235 let namespace_lit = EscapedLiteral::new(namespace);
236
237 let existing_status = self
239 .get_migration_status(migration_name, &namespace_lit)
240 .await
241 .map_err(MigrationError::Database)?;
242
243 if let Some(info) = existing_status {
244 let name = migration_name.to_string();
245 let ns = namespace_lit.raw_value().to_string();
246
247 match info.last_status {
248 MigrationHistoryStatus::Success => {
249 return Err(MigrationError::AlreadyApplied {
250 name,
251 namespace: ns,
252 info,
253 });
254 }
255 MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {}
258 }
259 }
260
261 self.record_migration(
263 migration_name,
264 &namespace_lit,
265 MigrationStatus::Success,
266 MigrationActivity::Adopt,
267 None, None, None, Some(description),
271 )
272 .await?;
273
274 Ok(format!(
275 "Migration '{}' adopted successfully",
276 migration_name
277 ))
278 }
279
280 async fn get_migrations_from_db(
281 &self,
282 namespace: Option<&str>,
283 ) -> MigrationResult<Vec<crate::engine::MigrationDbInfo>> {
284 use serde::Deserialize;
285
286 let namespace_lit = namespace.map(|ns| EscapedLiteral::new(ns));
288 let query = sql_query!(
289 r#"
290 SELECT json_agg(row_to_json(t))
291 FROM (
292 SELECT DISTINCT ON (m.name)
293 m.name as migration_name,
294 mh.status_id_status as last_status,
295 mh.activity_id_activity as last_activity,
296 encode(mh.checksum, 'hex') as checksum
297 FROM {}.migration m
298 LEFT JOIN {}.migration_history mh ON m.migration_id = mh.migration_id_migration
299 WHERE {} IS NULL OR m.namespace = {}
300 ORDER BY m.name, mh.created_at DESC NULLS LAST
301 ) t
302 "#,
303 self.spawn_schema_ident(),
304 self.spawn_schema_ident(),
305 namespace_lit,
306 namespace_lit
307 );
308
309 let output = self
310 .execute_sql(&query, Some("unaligned"))
311 .await
312 .map_err(MigrationError::Database)?;
313
314 #[derive(Deserialize)]
316 struct MigrationRow {
317 migration_name: String,
318 last_status: Option<String>,
319 last_activity: Option<String>,
320 checksum: Option<String>,
321 }
322
323 let json_str = output.trim();
325
326 if json_str == "null" || json_str.is_empty() {
328 return Ok(Vec::new());
329 }
330
331 let rows: Vec<MigrationRow> = serde_json::from_str(json_str).map_err(|e| {
332 MigrationError::Database(anyhow::anyhow!(
333 "Failed to parse JSON from database (output: '{}'): {}",
334 json_str,
335 e
336 ))
337 })?;
338
339 let mut results: Vec<crate::engine::MigrationDbInfo> = rows
341 .into_iter()
342 .map(|row| {
343 let status = row
344 .last_status
345 .as_deref()
346 .and_then(MigrationHistoryStatus::from_str);
347
348 crate::engine::MigrationDbInfo {
349 migration_name: row.migration_name,
350 last_status: status,
351 last_activity: row.last_activity,
352 checksum: row.checksum,
353 }
354 })
355 .collect();
356
357 results.sort_by(|a, b| a.migration_name.cmp(&b.migration_name));
359
360 Ok(results)
361 }
362}
363
364struct SharedBufWriter(Arc<Mutex<Vec<u8>>>);
366
367impl tokio::io::AsyncWrite for SharedBufWriter {
368 fn poll_write(
369 self: std::pin::Pin<&mut Self>,
370 _cx: &mut std::task::Context<'_>,
371 buf: &[u8],
372 ) -> std::task::Poll<std::io::Result<usize>> {
373 self.0.lock().unwrap().extend_from_slice(buf);
374 std::task::Poll::Ready(Ok(buf.len()))
375 }
376
377 fn poll_flush(
378 self: std::pin::Pin<&mut Self>,
379 _cx: &mut std::task::Context<'_>,
380 ) -> std::task::Poll<std::io::Result<()>> {
381 std::task::Poll::Ready(Ok(()))
382 }
383
384 fn poll_shutdown(
385 self: std::pin::Pin<&mut Self>,
386 _cx: &mut std::task::Context<'_>,
387 ) -> std::task::Poll<std::io::Result<()>> {
388 std::task::Poll::Ready(Ok(()))
389 }
390}
391
392struct TeeWriter<W: Write> {
395 inner: W,
396 hasher: xxhash3_128::Hasher,
397}
398
399impl<W: Write> TeeWriter<W> {
400 fn new(inner: W) -> Self {
401 Self {
402 inner,
403 hasher: xxhash3_128::Hasher::new(),
404 }
405 }
406
407 fn finish(self) -> (W, u128) {
408 (self.inner, self.hasher.finish_128())
409 }
410}
411
412impl<W: Write> Write for TeeWriter<W> {
413 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
414 self.hasher.write(buf);
415 self.inner.write(buf)
416 }
417
418 fn flush(&mut self) -> std::io::Result<()> {
419 self.inner.flush()
420 }
421}
422
423fn build_record_migration_sql(
426 spawn_schema: &str,
427 migration_name: &str,
428 namespace: &EscapedLiteral,
429 status: MigrationStatus,
430 activity: MigrationActivity,
431 checksum: Option<&str>,
432 execution_time: Option<f32>,
433 pin_hash: Option<&str>,
434 description: Option<&str>,
435) -> EscapedQuery {
436 let schema_ident = EscapedIdentifier::new(spawn_schema);
437 let safe_migration_name = EscapedLiteral::new(migration_name);
438 let safe_status = EscapedLiteral::new(status.as_str());
439 let safe_activity = EscapedLiteral::new(activity.as_str());
440 let safe_description = EscapedLiteral::new(description.unwrap_or(""));
441 let checksum_expr = checksum
443 .map(|c| format!("decode('{}', 'hex')", c))
444 .unwrap_or_else(|| "decode('', 'hex')".to_string());
445 let checksum_raw = InsecureRawSql::new(&checksum_expr);
446 let safe_pin_hash = pin_hash.map(|h| EscapedLiteral::new(h));
447
448 let duration_interval = execution_time
449 .map(|d| InsecureRawSql::new(&format!("INTERVAL '{} second'", d)))
450 .unwrap_or_else(|| InsecureRawSql::new("INTERVAL '0 second'"));
451
452 sql_query!(
453 r#"
454BEGIN;
455WITH inserted_migration AS (
456 INSERT INTO {}.migration (name, namespace) VALUES ({}, {})
457 ON CONFLICT (name, namespace) DO UPDATE SET name = EXCLUDED.name
458 RETURNING migration_id
459)
460INSERT INTO {}.migration_history (
461 migration_id_migration,
462 activity_id_activity,
463 created_by,
464 description,
465 status_note,
466 status_id_status,
467 checksum,
468 execution_time,
469 pin_hash
470)
471SELECT
472 migration_id,
473 {},
474 'unused',
475 {},
476 '',
477 {},
478 {},
479 {},
480 {}
481FROM inserted_migration;
482COMMIT;
483"#,
484 schema_ident,
485 safe_migration_name,
486 namespace,
487 schema_ident,
488 safe_activity,
489 safe_description,
490 safe_status,
491 checksum_raw,
492 duration_interval,
493 safe_pin_hash,
494 )
495}
496
497impl PSQL {
498 pub async fn update_schema(&self) -> Result<()> {
499 let op = operator_from_includedir(&PROJECT_DIR, None)
502 .await
503 .context("Failed to create operator from included directory")?;
504
505 let pinner = Latest::new("").context("Failed to create Latest pinner")?;
507 let pather = FolderPather {
508 spawn_folder: "".to_string(),
509 };
510 let store = Store::new(Box::new(pinner), op.clone(), pather)
511 .context("Failed to create store for update_schema")?;
512
513 let available_migrations = store
515 .list_migrations()
516 .await
517 .context("Failed to list migrations")?;
518
519 let migration_table_exists = self
521 .migration_table_exists()
522 .await
523 .context("Failed checking if migration table exists")?;
524
525 let applied_migrations: HashSet<String> = if migration_table_exists {
527 self.get_applied_migrations_set(&self.safe_spawn_namespace())
528 .await
529 .context("Failed to get applied migrations set")?
530 } else {
531 HashSet::new()
532 };
533
534 let mut cfg = crate::config::Config::load("spawn.toml", &op, None)
537 .await
538 .context("Failed to load config for postgres psql")?;
539 let dbengtype = "psql".to_string();
540 cfg.database = Some(dbengtype.clone());
541 cfg.databases = HashMap::from([(dbengtype, self.db_config.clone())]);
542
543 for migration_path in available_migrations {
545 let migration_name = migration_path
547 .trim_end_matches('/')
548 .rsplit('/')
549 .next()
550 .unwrap_or(&migration_path);
551
552 if applied_migrations.contains(migration_name) {
554 continue;
555 }
556
557 let migrator = crate::migrator::Migrator::new(&cfg, &migration_name, false);
558
559 let variables = crate::variables::Variables::from_str(
561 "json",
562 &serde_json::json!({"schema": &self.spawn_schema}).to_string(),
563 )?;
564 let gen = migrator.generate_streaming(Some(variables)).await?;
565 let mut buffer = Vec::new();
566 gen.render_to_writer(&mut buffer)
567 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
568 let content = String::from_utf8(buffer)?;
569
570 let write_fn: WriterFn =
574 Box::new(move |writer: &mut dyn Write| writer.write_all(content.as_bytes()));
575 match self
576 .apply_and_record_migration_v1(
577 migration_name,
578 write_fn,
579 None, self.safe_spawn_namespace(),
581 false, )
583 .await
584 {
585 Ok(_) => {}
586 Err(MigrationError::AlreadyApplied { .. }) => {}
588 Err(e) => return Err(e.into()),
590 }
591 }
592
593 Ok(())
594 }
595
596 async fn execute_sql(&self, query: &EscapedQuery, format: Option<&str>) -> Result<String> {
599 let query_str = query.as_str().to_string();
600 let format_owned = format.map(|s| s.to_string());
601
602 let stdout_buf = Arc::new(Mutex::new(Vec::new()));
604 let stdout_buf_clone = stdout_buf.clone();
605
606 self.execute_with_writer(
607 Box::new(move |writer| {
608 if let Some(fmt) = format_owned {
610 writer.write_all(b"\\pset tuples_only on\n")?;
611 writer.write_all(format!("\\pset format {}\n", fmt).as_bytes())?;
612 }
613 writer.write_all(query_str.as_bytes())?;
614 Ok(())
615 }),
616 Some(Box::new(SharedBufWriter(stdout_buf_clone))),
617 false, )
619 .await
620 .map_err(|e| anyhow!("SQL execution failed: {}", e))?;
621
622 let buf = stdout_buf.lock().unwrap();
623 Ok(String::from_utf8_lossy(&buf).to_string())
624 }
625
626 async fn migration_table_exists(&self) -> Result<bool> {
627 self.table_exists("migration").await
628 }
629
630 async fn migration_history_table_exists(&self) -> Result<bool> {
631 self.table_exists("migration_history").await
632 }
633
634 async fn table_exists(&self, table_name: &str) -> Result<bool> {
635 let safe_table_name = EscapedLiteral::new(table_name);
636 let query = sql_query!(
638 r#"
639 SELECT EXISTS (
640 SELECT FROM information_schema.tables
641 WHERE table_schema = {}
642 AND table_name = {}
643 );
644 "#,
645 self.spawn_schema_literal(),
646 safe_table_name
647 );
648
649 let output = self.execute_sql(&query, Some("csv")).await?;
650 Ok(output.trim() == "t")
652 }
653
654 async fn get_applied_migrations_set(
655 &self,
656 namespace: &EscapedLiteral,
657 ) -> Result<HashSet<String>> {
658 let query = sql_query!(
659 "SELECT name FROM {}.migration WHERE namespace = {};",
660 self.spawn_schema_ident(),
661 namespace,
662 );
663
664 let output = self.execute_sql(&query, Some("csv")).await?;
665 let mut migrations = HashSet::new();
666
667 for line in output.lines() {
669 let name = line.trim();
670 if !name.is_empty() {
671 migrations.insert(name.to_string());
672 }
673 }
674
675 Ok(migrations)
676 }
677
678 async fn get_migration_status(
681 &self,
682 migration_name: &str,
683 namespace: &EscapedLiteral,
684 ) -> Result<Option<ExistingMigrationInfo>> {
685 let safe_migration_name = EscapedLiteral::new(migration_name);
686 let query = sql_query!(
687 r#"
688 SELECT m.name, m.namespace, mh.status_id_status, mh.activity_id_activity, encode(mh.checksum, 'hex')
689 FROM {}.migration_history mh
690 JOIN {}.migration m ON mh.migration_id_migration = m.migration_id
691 WHERE m.name = {} AND m.namespace = {}
692 ORDER BY mh.migration_history_id DESC
693 LIMIT 1;
694 "#,
695 self.spawn_schema_ident(),
696 self.spawn_schema_ident(),
697 safe_migration_name,
698 namespace
699 );
700
701 let output = self.execute_sql(&query, Some("csv")).await?;
702
703 let data_line = output.trim();
706 if data_line.is_empty() {
707 return Ok(None);
708 }
709
710 let parts: Vec<&str> = data_line.split(',').collect();
711 if parts.len() < 5 {
712 return Ok(None);
713 }
714
715 let status = match parts[2].trim() {
716 "SUCCESS" => MigrationHistoryStatus::Success,
717 "ATTEMPTED" => MigrationHistoryStatus::Attempted,
718 "FAILURE" => MigrationHistoryStatus::Failure,
719 _ => return Ok(None),
720 };
721
722 Ok(Some(ExistingMigrationInfo {
723 migration_name: parts[0].trim().to_string(),
724 namespace: parts[1].trim().to_string(),
725 last_status: status,
726 last_activity: parts[3].trim().to_string(),
727 checksum: parts[4].trim().to_string(),
728 }))
729 }
730
731 async fn record_migration(
735 &self,
736 migration_name: &str,
737 namespace: &EscapedLiteral,
738 status: MigrationStatus,
739 activity: MigrationActivity,
740 checksum: Option<&str>,
741 execution_time: Option<f32>,
742 pin_hash: Option<&str>,
743 description: Option<&str>,
744 ) -> MigrationResult<()> {
745 let record_query = build_record_migration_sql(
746 &self.spawn_schema,
747 migration_name,
748 namespace,
749 status,
750 activity,
751 checksum,
752 execution_time,
753 pin_hash,
754 description,
755 );
756
757 self.execute_with_writer(
758 Box::new(move |writer| {
759 writer.write_all(record_query.as_str().as_bytes())?;
760 Ok(())
761 }),
762 None,
763 false, )
765 .await
766 .map_err(|e| match e {
767 EngineError::ExecutionFailed { exit_code, stderr } => {
768 MigrationError::Database(anyhow!(
769 "Failed to record migration (exit {}): {}",
770 exit_code,
771 stderr
772 ))
773 }
774 EngineError::Io(e) => MigrationError::Database(e.into()),
775 })?;
776
777 Ok(())
778 }
779
780 async fn apply_and_record_migration_v1(
784 &self,
785 migration_name: &str,
786 write_fn: WriterFn,
787 pin_hash: Option<String>,
788 namespace: EscapedLiteral,
789 retry: bool,
790 ) -> MigrationResult<String> {
791 let existing_status = if self
793 .migration_history_table_exists()
794 .await
795 .map_err(MigrationError::Database)?
796 {
797 self.get_migration_status(migration_name, &namespace)
798 .await
799 .map_err(MigrationError::Database)?
800 } else {
801 None
802 };
803
804 if let Some(info) = existing_status {
805 if !retry {
806 let name = migration_name.to_string();
807 let ns = namespace.raw_value().to_string();
808
809 match info.last_status {
810 MigrationHistoryStatus::Success => {
811 return Err(MigrationError::AlreadyApplied {
812 name,
813 namespace: ns,
814 info,
815 });
816 }
817 MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {
818 return Err(MigrationError::PreviousAttemptFailed {
819 name,
820 namespace: ns,
821 status: info.last_status.clone(),
822 info,
823 });
824 }
825 }
826 }
827 }
828
829 let start_time = Instant::now();
830 let lock_checksum = migration_lock_key();
831
832 let checksum_result: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
834 let checksum_result_clone = checksum_result.clone();
835
836 let migration_result = self
838 .execute_with_writer(
839 Box::new(move |writer| {
840 writer.write_all(
842 format!(
843 r#"DO $$ BEGIN IF NOT pg_try_advisory_lock({}) THEN RAISE EXCEPTION 'Could not acquire advisory lock'; END IF; END $$;"#,
844 lock_checksum
845 )
846 .as_bytes(),
847 )?;
848
849 let mut tee_writer = TeeWriter::new(writer);
851
852 write_fn(&mut tee_writer)?;
854
855 let (_writer, content_checksum) = tee_writer.finish();
857 let checksum_hex = format!("{:032x}", content_checksum);
858 *checksum_result_clone.lock().unwrap() = Some(checksum_hex);
859
860 Ok(())
861 }),
862 None,
863 false, )
865 .await;
866
867 let duration = start_time.elapsed().as_secs_f32();
868 let checksum_hex = checksum_result.lock().unwrap().clone();
869
870 let (status, migration_error) = match &migration_result {
872 Ok(()) => (MigrationStatus::Success, None),
873 Err(EngineError::ExecutionFailed { exit_code, stderr }) => {
874 if stderr.contains("Could not acquire advisory lock") {
875 return Err(MigrationError::AdvisoryLock(std::io::Error::new(
876 std::io::ErrorKind::Other,
877 stderr.clone(),
878 )));
879 }
880 (
881 MigrationStatus::Failure,
882 Some(format!("psql exited with code {}: {}", exit_code, stderr)),
883 )
884 }
885 Err(EngineError::Io(e)) => {
886 return Err(MigrationError::Database(anyhow!(
887 "IO error running migration: {}",
888 e
889 )));
890 }
891 };
892
893 let record_result = self
895 .record_migration(
896 migration_name,
897 &namespace,
898 status,
899 MigrationActivity::Apply,
900 checksum_hex.as_deref(),
901 Some(duration),
902 pin_hash.as_deref(),
903 None,
904 )
905 .await;
906
907 if let Err(record_err) = record_result {
909 if migration_error.is_none() {
911 return Err(MigrationError::NotRecorded {
912 name: migration_name.to_string(),
913 migration_outcome: MigrationStatus::Success,
914 migration_error: None,
915 recording_error: format!("{}", record_err),
916 });
917 }
918 return Err(MigrationError::NotRecorded {
920 name: migration_name.to_string(),
921 migration_outcome: MigrationStatus::Failure,
922 migration_error: migration_error.clone(),
923 recording_error: format!("{}", record_err),
924 });
925 }
926
927 if let Some(err_msg) = migration_error {
929 return Err(MigrationError::Database(anyhow!(
930 "Migration '{}' failed: {}",
931 migration_name,
932 err_msg
933 )));
934 }
935
936 Ok("Migration applied successfully".to_string())
937 }
938}