1use crate::config::FolderPather;
6use crate::engine::{
7 resolve_command_spec, DatabaseConfig, Engine, EngineError, ExistingMigrationInfo,
8 MigrationActivity, MigrationError, MigrationHistoryStatus, MigrationResult, MigrationStatus,
9 StdoutWriter, WriterFn,
10};
11use crate::escape::{EscapedIdentifier, EscapedLiteral, EscapedQuery, InsecureRawSql};
12use crate::sql_query;
13use crate::store::pinner::latest::Latest;
14use crate::store::{operator_from_includedir, Store};
15use anyhow::{anyhow, Context, Result};
16use async_trait::async_trait;
17use include_dir::{include_dir, Dir};
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::io::Write;
21use std::process::Stdio;
22use std::sync::{Arc, Mutex};
23use std::time::Instant;
24use tokio::io::AsyncReadExt;
25use tokio::process::Command;
26use twox_hash::xxhash3_128;
27use twox_hash::XxHash64;
28
29pub fn migration_lock_key() -> i64 {
32 XxHash64::oneshot(1234, "SPAWN_MIGRATION_LOCK".as_bytes()) as i64
33}
34
35#[derive(Debug)]
36pub struct PSQL {
37 psql_command: Vec<String>,
38 spawn_schema: String,
40 db_config: DatabaseConfig,
41}
42
43static PROJECT_DIR: Dir<'_> = include_dir!("./static/engine-migrations/postgres-psql");
44static SPAWN_NAMESPACE: &str = "spawn";
45
46impl PSQL {
47 pub async fn new(config: &DatabaseConfig) -> Result<Box<dyn Engine>> {
48 let command_spec = config
49 .command
50 .clone()
51 .ok_or(anyhow!("Command for database config must be defined"))?;
52
53 let psql_command = resolve_command_spec(command_spec).await?;
54
55 let eng = Box::new(Self {
56 psql_command,
57 spawn_schema: config.spawn_schema.clone(),
58 db_config: config.clone(),
59 });
60
61 eng.update_schema()
63 .await
64 .map_err(MigrationError::Database)?;
65
66 Ok(eng)
67 }
68
69 fn spawn_schema_literal(&self) -> EscapedLiteral {
70 EscapedLiteral::new(&self.spawn_schema)
71 }
72
73 fn spawn_schema_ident(&self) -> EscapedIdentifier {
74 EscapedIdentifier::new(&self.spawn_schema)
75 }
76
77 fn safe_spawn_namespace(&self) -> EscapedLiteral {
78 EscapedLiteral::new(SPAWN_NAMESPACE)
79 }
80}
81
82#[async_trait]
83impl Engine for PSQL {
84 async fn execute_with_writer(
85 &self,
86 write_fn: WriterFn,
87 stdout_writer: StdoutWriter,
88 ) -> Result<(), EngineError> {
89 let (reader, mut writer) = std::io::pipe()?;
91
92 let stdout_config = if stdout_writer.is_some() {
94 Stdio::piped()
95 } else {
96 Stdio::null()
97 };
98
99 let mut child = Command::new(&self.psql_command[0])
101 .args(&self.psql_command[1..])
102 .stdin(Stdio::from(reader))
103 .stdout(stdout_config)
104 .stderr(Stdio::piped())
105 .spawn()
106 .map_err(EngineError::Io)?;
107
108 let stdout_handle = if let Some(mut stdout_dest) = stdout_writer {
110 let mut stdout = child.stdout.take().expect("stdout should be piped");
111 Some(tokio::spawn(async move {
112 use tokio::io::AsyncWriteExt;
113 let mut buf = Vec::new();
114 let _ = stdout.read_to_end(&mut buf).await;
115 let _ = stdout_dest.write_all(&buf).await;
116 }))
117 } else {
118 None
119 };
120
121 let mut stderr = child.stderr.take().expect("stderr should be piped");
123 let stderr_handle = tokio::spawn(async move {
124 let mut buf = Vec::new();
125 let _ = stderr.read_to_end(&mut buf).await;
126 buf
127 });
128
129 let writer_handle = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
131 writer.write_all(b"\\set QUIET on\n")?;
133 writer.write_all(b"\\pset pager off\n")?;
134 writer.write_all(b"\\set ON_ERROR_STOP on\n")?;
135
136 write_fn(&mut writer)?;
138
139 Ok(())
141 });
142
143 writer_handle
145 .await
146 .map_err(|e| EngineError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??;
147
148 if let Some(handle) = stdout_handle {
150 let _ = handle.await;
152 }
153
154 let status = child.wait().await?;
156 let stderr_bytes = stderr_handle.await.unwrap_or_default();
157
158 if !status.success() {
159 return Err(EngineError::ExecutionFailed {
160 exit_code: status.code().unwrap_or(-1),
161 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
162 });
163 }
164
165 Ok(())
166 }
167
168 async fn migration_apply(
169 &self,
170 migration_name: &str,
171 write_fn: WriterFn,
172 pin_hash: Option<String>,
173 namespace: &str,
174 retry: bool,
175 ) -> MigrationResult<String> {
176 self.apply_and_record_migration_v1(
177 migration_name,
178 write_fn,
179 pin_hash,
180 EscapedLiteral::new(namespace),
181 retry,
182 )
183 .await
184 }
185
186 async fn migration_adopt(
187 &self,
188 migration_name: &str,
189 namespace: &str,
190 description: &str,
191 ) -> MigrationResult<String> {
192 let namespace_lit = EscapedLiteral::new(namespace);
193
194 let existing_status = self
196 .get_migration_status(migration_name, &namespace_lit)
197 .await
198 .map_err(MigrationError::Database)?;
199
200 if let Some(info) = existing_status {
201 let name = migration_name.to_string();
202 let ns = namespace_lit.raw_value().to_string();
203
204 match info.last_status {
205 MigrationHistoryStatus::Success => {
206 return Err(MigrationError::AlreadyApplied {
207 name,
208 namespace: ns,
209 info,
210 });
211 }
212 MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {}
215 }
216 }
217
218 self.record_migration(
220 migration_name,
221 &namespace_lit,
222 MigrationStatus::Success,
223 MigrationActivity::Adopt,
224 None, None, None, Some(description),
228 )
229 .await?;
230
231 Ok(format!(
232 "Migration '{}' adopted successfully",
233 migration_name
234 ))
235 }
236
237 async fn get_migrations_from_db(
238 &self,
239 namespace: Option<&str>,
240 ) -> MigrationResult<Vec<crate::engine::MigrationDbInfo>> {
241 use serde::Deserialize;
242
243 let namespace_lit = namespace.map(|ns| EscapedLiteral::new(ns));
245 let query = sql_query!(
246 r#"
247 SELECT json_agg(row_to_json(t))
248 FROM (
249 SELECT DISTINCT ON (m.name)
250 m.name as migration_name,
251 mh.status_id_status as last_status,
252 mh.activity_id_activity as last_activity,
253 encode(mh.checksum, 'hex') as checksum
254 FROM {}.migration m
255 LEFT JOIN {}.migration_history mh ON m.migration_id = mh.migration_id_migration
256 WHERE {} IS NULL OR m.namespace = {}
257 ORDER BY m.name, mh.created_at DESC NULLS LAST
258 ) t
259 "#,
260 self.spawn_schema_ident(),
261 self.spawn_schema_ident(),
262 namespace_lit,
263 namespace_lit
264 );
265
266 let output = self
267 .execute_sql(&query, Some("unaligned"))
268 .await
269 .map_err(MigrationError::Database)?;
270
271 #[derive(Deserialize)]
273 struct MigrationRow {
274 migration_name: String,
275 last_status: Option<String>,
276 last_activity: Option<String>,
277 checksum: Option<String>,
278 }
279
280 let json_str = output.trim();
282
283 if json_str == "null" || json_str.is_empty() {
285 return Ok(Vec::new());
286 }
287
288 let rows: Vec<MigrationRow> = serde_json::from_str(json_str).map_err(|e| {
289 MigrationError::Database(anyhow::anyhow!(
290 "Failed to parse JSON from database (output: '{}'): {}",
291 json_str,
292 e
293 ))
294 })?;
295
296 let mut results: Vec<crate::engine::MigrationDbInfo> = rows
298 .into_iter()
299 .map(|row| {
300 let status = row
301 .last_status
302 .as_deref()
303 .and_then(MigrationHistoryStatus::from_str);
304
305 crate::engine::MigrationDbInfo {
306 migration_name: row.migration_name,
307 last_status: status,
308 last_activity: row.last_activity,
309 checksum: row.checksum,
310 }
311 })
312 .collect();
313
314 results.sort_by(|a, b| a.migration_name.cmp(&b.migration_name));
316
317 Ok(results)
318 }
319}
320
321struct SharedBufWriter(Arc<Mutex<Vec<u8>>>);
323
324impl tokio::io::AsyncWrite for SharedBufWriter {
325 fn poll_write(
326 self: std::pin::Pin<&mut Self>,
327 _cx: &mut std::task::Context<'_>,
328 buf: &[u8],
329 ) -> std::task::Poll<std::io::Result<usize>> {
330 self.0.lock().unwrap().extend_from_slice(buf);
331 std::task::Poll::Ready(Ok(buf.len()))
332 }
333
334 fn poll_flush(
335 self: std::pin::Pin<&mut Self>,
336 _cx: &mut std::task::Context<'_>,
337 ) -> std::task::Poll<std::io::Result<()>> {
338 std::task::Poll::Ready(Ok(()))
339 }
340
341 fn poll_shutdown(
342 self: std::pin::Pin<&mut Self>,
343 _cx: &mut std::task::Context<'_>,
344 ) -> std::task::Poll<std::io::Result<()>> {
345 std::task::Poll::Ready(Ok(()))
346 }
347}
348
349struct TeeWriter<W: Write> {
352 inner: W,
353 hasher: xxhash3_128::Hasher,
354}
355
356impl<W: Write> TeeWriter<W> {
357 fn new(inner: W) -> Self {
358 Self {
359 inner,
360 hasher: xxhash3_128::Hasher::new(),
361 }
362 }
363
364 fn finish(self) -> (W, u128) {
365 (self.inner, self.hasher.finish_128())
366 }
367}
368
369impl<W: Write> Write for TeeWriter<W> {
370 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
371 self.hasher.write(buf);
372 self.inner.write(buf)
373 }
374
375 fn flush(&mut self) -> std::io::Result<()> {
376 self.inner.flush()
377 }
378}
379
380fn build_record_migration_sql(
383 spawn_schema: &str,
384 migration_name: &str,
385 namespace: &EscapedLiteral,
386 status: MigrationStatus,
387 activity: MigrationActivity,
388 checksum: Option<&str>,
389 execution_time: Option<f32>,
390 pin_hash: Option<&str>,
391 description: Option<&str>,
392) -> EscapedQuery {
393 let schema_ident = EscapedIdentifier::new(spawn_schema);
394 let safe_migration_name = EscapedLiteral::new(migration_name);
395 let safe_status = EscapedLiteral::new(status.as_str());
396 let safe_activity = EscapedLiteral::new(activity.as_str());
397 let safe_description = EscapedLiteral::new(description.unwrap_or(""));
398 let checksum_expr = checksum
400 .map(|c| format!("decode('{}', 'hex')", c))
401 .unwrap_or_else(|| "decode('', 'hex')".to_string());
402 let checksum_raw = InsecureRawSql::new(&checksum_expr);
403 let safe_pin_hash = pin_hash.map(|h| EscapedLiteral::new(h));
404
405 let duration_interval = execution_time
406 .map(|d| InsecureRawSql::new(&format!("INTERVAL '{} second'", d)))
407 .unwrap_or_else(|| InsecureRawSql::new("INTERVAL '0 second'"));
408
409 sql_query!(
410 r#"
411BEGIN;
412WITH inserted_migration AS (
413 INSERT INTO {}.migration (name, namespace) VALUES ({}, {})
414 ON CONFLICT (name, namespace) DO UPDATE SET name = EXCLUDED.name
415 RETURNING migration_id
416)
417INSERT INTO {}.migration_history (
418 migration_id_migration,
419 activity_id_activity,
420 created_by,
421 description,
422 status_note,
423 status_id_status,
424 checksum,
425 execution_time,
426 pin_hash
427)
428SELECT
429 migration_id,
430 {},
431 'unused',
432 {},
433 '',
434 {},
435 {},
436 {},
437 {}
438FROM inserted_migration;
439COMMIT;
440"#,
441 schema_ident,
442 safe_migration_name,
443 namespace,
444 schema_ident,
445 safe_activity,
446 safe_description,
447 safe_status,
448 checksum_raw,
449 duration_interval,
450 safe_pin_hash,
451 )
452}
453
454impl PSQL {
455 pub async fn update_schema(&self) -> Result<()> {
456 let op = operator_from_includedir(&PROJECT_DIR, None)
459 .await
460 .context("Failed to create operator from included directory")?;
461
462 let pinner = Latest::new("").context("Failed to create Latest pinner")?;
464 let pather = FolderPather {
465 spawn_folder: "".to_string(),
466 };
467 let store = Store::new(Box::new(pinner), op.clone(), pather)
468 .context("Failed to create store for update_schema")?;
469
470 let available_migrations = store
472 .list_migrations()
473 .await
474 .context("Failed to list migrations")?;
475
476 let migration_table_exists = self
478 .migration_table_exists()
479 .await
480 .context("Failed checking if migration table exists")?;
481
482 let applied_migrations: HashSet<String> = if migration_table_exists {
484 self.get_applied_migrations_set(&self.safe_spawn_namespace())
485 .await
486 .context("Failed to get applied migrations set")?
487 } else {
488 HashSet::new()
489 };
490
491 let mut cfg = crate::config::Config::load("spawn.toml", &op, None)
494 .await
495 .context("Failed to load config for postgres psql")?;
496 let dbengtype = "psql".to_string();
497 cfg.database = Some(dbengtype.clone());
498 cfg.databases = HashMap::from([(dbengtype, self.db_config.clone())]);
499
500 for migration_path in available_migrations {
502 let migration_name = migration_path
504 .trim_end_matches('/')
505 .rsplit('/')
506 .next()
507 .unwrap_or(&migration_path);
508
509 if applied_migrations.contains(migration_name) {
511 continue;
512 }
513
514 let migrator = crate::migrator::Migrator::new(&cfg, &migration_name, false);
515
516 let variables = crate::variables::Variables::from_str(
518 "json",
519 &serde_json::json!({"schema": &self.spawn_schema}).to_string(),
520 )?;
521 let gen = migrator.generate_streaming(Some(variables)).await?;
522 let mut buffer = Vec::new();
523 gen.render_to_writer(&mut buffer)
524 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
525 let content = String::from_utf8(buffer)?;
526
527 let write_fn: WriterFn =
531 Box::new(move |writer: &mut dyn Write| writer.write_all(content.as_bytes()));
532 match self
533 .apply_and_record_migration_v1(
534 migration_name,
535 write_fn,
536 None, self.safe_spawn_namespace(),
538 false, )
540 .await
541 {
542 Ok(_) => {}
543 Err(MigrationError::AlreadyApplied { .. }) => {}
545 Err(e) => return Err(e.into()),
547 }
548 }
549
550 Ok(())
551 }
552
553 async fn execute_sql(&self, query: &EscapedQuery, format: Option<&str>) -> Result<String> {
556 let query_str = query.as_str().to_string();
557 let format_owned = format.map(|s| s.to_string());
558
559 let stdout_buf = Arc::new(Mutex::new(Vec::new()));
561 let stdout_buf_clone = stdout_buf.clone();
562
563 self.execute_with_writer(
564 Box::new(move |writer| {
565 if let Some(fmt) = format_owned {
567 writer.write_all(b"\\pset tuples_only on\n")?;
568 writer.write_all(format!("\\pset format {}\n", fmt).as_bytes())?;
569 }
570 writer.write_all(query_str.as_bytes())?;
571 Ok(())
572 }),
573 Some(Box::new(SharedBufWriter(stdout_buf_clone))),
574 )
575 .await
576 .map_err(|e| anyhow!("SQL execution failed: {}", e))?;
577
578 let buf = stdout_buf.lock().unwrap();
579 Ok(String::from_utf8_lossy(&buf).to_string())
580 }
581
582 async fn migration_table_exists(&self) -> Result<bool> {
583 self.table_exists("migration").await
584 }
585
586 async fn migration_history_table_exists(&self) -> Result<bool> {
587 self.table_exists("migration_history").await
588 }
589
590 async fn table_exists(&self, table_name: &str) -> Result<bool> {
591 let safe_table_name = EscapedLiteral::new(table_name);
592 let query = sql_query!(
594 r#"
595 SELECT EXISTS (
596 SELECT FROM information_schema.tables
597 WHERE table_schema = {}
598 AND table_name = {}
599 );
600 "#,
601 self.spawn_schema_literal(),
602 safe_table_name
603 );
604
605 let output = self.execute_sql(&query, Some("csv")).await?;
606 Ok(output.trim() == "t")
608 }
609
610 async fn get_applied_migrations_set(
611 &self,
612 namespace: &EscapedLiteral,
613 ) -> Result<HashSet<String>> {
614 let query = sql_query!(
615 "SELECT name FROM {}.migration WHERE namespace = {};",
616 self.spawn_schema_ident(),
617 namespace,
618 );
619
620 let output = self.execute_sql(&query, Some("csv")).await?;
621 let mut migrations = HashSet::new();
622
623 for line in output.lines() {
625 let name = line.trim();
626 if !name.is_empty() {
627 migrations.insert(name.to_string());
628 }
629 }
630
631 Ok(migrations)
632 }
633
634 async fn get_migration_status(
637 &self,
638 migration_name: &str,
639 namespace: &EscapedLiteral,
640 ) -> Result<Option<ExistingMigrationInfo>> {
641 let safe_migration_name = EscapedLiteral::new(migration_name);
642 let query = sql_query!(
643 r#"
644 SELECT m.name, m.namespace, mh.status_id_status, mh.activity_id_activity, encode(mh.checksum, 'hex')
645 FROM {}.migration_history mh
646 JOIN {}.migration m ON mh.migration_id_migration = m.migration_id
647 WHERE m.name = {} AND m.namespace = {}
648 ORDER BY mh.migration_history_id DESC
649 LIMIT 1;
650 "#,
651 self.spawn_schema_ident(),
652 self.spawn_schema_ident(),
653 safe_migration_name,
654 namespace
655 );
656
657 let output = self.execute_sql(&query, Some("csv")).await?;
658
659 let data_line = output.trim();
662 if data_line.is_empty() {
663 return Ok(None);
664 }
665
666 let parts: Vec<&str> = data_line.split(',').collect();
667 if parts.len() < 5 {
668 return Ok(None);
669 }
670
671 let status = match parts[2].trim() {
672 "SUCCESS" => MigrationHistoryStatus::Success,
673 "ATTEMPTED" => MigrationHistoryStatus::Attempted,
674 "FAILURE" => MigrationHistoryStatus::Failure,
675 _ => return Ok(None),
676 };
677
678 Ok(Some(ExistingMigrationInfo {
679 migration_name: parts[0].trim().to_string(),
680 namespace: parts[1].trim().to_string(),
681 last_status: status,
682 last_activity: parts[3].trim().to_string(),
683 checksum: parts[4].trim().to_string(),
684 }))
685 }
686
687 async fn record_migration(
691 &self,
692 migration_name: &str,
693 namespace: &EscapedLiteral,
694 status: MigrationStatus,
695 activity: MigrationActivity,
696 checksum: Option<&str>,
697 execution_time: Option<f32>,
698 pin_hash: Option<&str>,
699 description: Option<&str>,
700 ) -> MigrationResult<()> {
701 let record_query = build_record_migration_sql(
702 &self.spawn_schema,
703 migration_name,
704 namespace,
705 status,
706 activity,
707 checksum,
708 execution_time,
709 pin_hash,
710 description,
711 );
712
713 self.execute_with_writer(
714 Box::new(move |writer| {
715 writer.write_all(record_query.as_str().as_bytes())?;
716 Ok(())
717 }),
718 None,
719 )
720 .await
721 .map_err(|e| match e {
722 EngineError::ExecutionFailed { exit_code, stderr } => {
723 MigrationError::Database(anyhow!(
724 "Failed to record migration (exit {}): {}",
725 exit_code,
726 stderr
727 ))
728 }
729 EngineError::Io(e) => MigrationError::Database(e.into()),
730 })?;
731
732 Ok(())
733 }
734
735 async fn apply_and_record_migration_v1(
739 &self,
740 migration_name: &str,
741 write_fn: WriterFn,
742 pin_hash: Option<String>,
743 namespace: EscapedLiteral,
744 retry: bool,
745 ) -> MigrationResult<String> {
746 let existing_status = if self
748 .migration_history_table_exists()
749 .await
750 .map_err(MigrationError::Database)?
751 {
752 self.get_migration_status(migration_name, &namespace)
753 .await
754 .map_err(MigrationError::Database)?
755 } else {
756 None
757 };
758
759 if let Some(info) = existing_status {
760 let name = migration_name.to_string();
761 let ns = namespace.raw_value().to_string();
762
763 match info.last_status {
764 MigrationHistoryStatus::Success => {
765 return Err(MigrationError::AlreadyApplied {
766 name,
767 namespace: ns,
768 info,
769 });
770 }
771 MigrationHistoryStatus::Attempted | MigrationHistoryStatus::Failure => {
772 if !retry {
773 return Err(MigrationError::PreviousAttemptFailed {
774 name,
775 namespace: ns,
776 status: info.last_status.clone(),
777 info,
778 });
779 }
780 }
781 }
782 }
783
784 let start_time = Instant::now();
785 let lock_checksum = migration_lock_key();
786
787 let checksum_result: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
789 let checksum_result_clone = checksum_result.clone();
790
791 let migration_result = self
793 .execute_with_writer(
794 Box::new(move |writer| {
795 writer.write_all(
797 format!(
798 r#"DO $$ BEGIN IF NOT pg_try_advisory_lock({}) THEN RAISE EXCEPTION 'Could not acquire advisory lock'; END IF; END $$;"#,
799 lock_checksum
800 )
801 .as_bytes(),
802 )?;
803
804 let mut tee_writer = TeeWriter::new(writer);
806
807 write_fn(&mut tee_writer)?;
809
810 let (_writer, content_checksum) = tee_writer.finish();
812 let checksum_hex = format!("{:032x}", content_checksum);
813 *checksum_result_clone.lock().unwrap() = Some(checksum_hex);
814
815 Ok(())
816 }),
817 None,
818 )
819 .await;
820
821 let duration = start_time.elapsed().as_secs_f32();
822 let checksum_hex = checksum_result.lock().unwrap().clone();
823
824 let (status, migration_error) = match &migration_result {
826 Ok(()) => (MigrationStatus::Success, None),
827 Err(EngineError::ExecutionFailed { exit_code, stderr }) => {
828 if stderr.contains("Could not acquire advisory lock") {
829 return Err(MigrationError::AdvisoryLock(std::io::Error::new(
830 std::io::ErrorKind::Other,
831 stderr.clone(),
832 )));
833 }
834 (
835 MigrationStatus::Failure,
836 Some(format!("psql exited with code {}: {}", exit_code, stderr)),
837 )
838 }
839 Err(EngineError::Io(e)) => {
840 return Err(MigrationError::Database(anyhow!(
841 "IO error running migration: {}",
842 e
843 )));
844 }
845 };
846
847 let record_result = self
849 .record_migration(
850 migration_name,
851 &namespace,
852 status,
853 MigrationActivity::Apply,
854 checksum_hex.as_deref(),
855 Some(duration),
856 pin_hash.as_deref(),
857 None,
858 )
859 .await;
860
861 if let Err(record_err) = record_result {
863 if migration_error.is_none() {
865 return Err(MigrationError::NotRecorded {
866 name: migration_name.to_string(),
867 migration_outcome: MigrationStatus::Success,
868 migration_error: None,
869 recording_error: format!("{}", record_err),
870 });
871 }
872 return Err(MigrationError::NotRecorded {
874 name: migration_name.to_string(),
875 migration_outcome: MigrationStatus::Failure,
876 migration_error: migration_error.clone(),
877 recording_error: format!("{}", record_err),
878 });
879 }
880
881 if let Some(err_msg) = migration_error {
883 return Err(MigrationError::Database(anyhow!(
884 "Migration '{}' failed: {}",
885 migration_name,
886 err_msg
887 )));
888 }
889
890 Ok("Migration applied successfully".to_string())
891 }
892}