1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use crate::acquire::Acquire;
use crate::migrate::{Migrate, MigrateError, Migration, MigrationSource};
use std::borrow::Cow;
use std::ops::Deref;
use std::slice;

#[derive(Debug)]
pub struct Migrator {
    pub migrations: Cow<'static, [Migration]>,
}

impl Migrator {
    /// Creates a new instance with the given source.
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// # use sqlx_core::migrate::MigrateError;
    /// # fn main() -> Result<(), MigrateError> {
    /// # sqlx_rt::block_on(async move {
    /// # use sqlx_core::migrate::Migrator;
    /// use std::path::Path;
    ///
    /// // Read migrations from a local folder: ./migrations
    /// let m = Migrator::new(Path::new("./migrations")).await?;
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub async fn new<'s, S>(source: S) -> Result<Self, MigrateError>
    where
        S: MigrationSource<'s>,
    {
        Ok(Self {
            migrations: Cow::Owned(source.resolve().await.map_err(MigrateError::Source)?),
        })
    }

    /// Get an iterator over all known migrations.
    pub fn iter(&self) -> slice::Iter<'_, Migration> {
        self.migrations.iter()
    }

    /// Run any pending migrations against the database; and, validate previously applied migrations
    /// against the current migration source to detect accidental changes in previously-applied migrations.
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// # use sqlx_core::migrate::MigrateError;
    /// # #[cfg(feature = "sqlite")]
    /// # fn main() -> Result<(), MigrateError> {
    /// #     sqlx_rt::block_on(async move {
    /// # use sqlx_core::migrate::Migrator;
    /// let m = Migrator::new(std::path::Path::new("./migrations")).await?;
    /// let pool = sqlx_core::sqlite::SqlitePoolOptions::new().connect("sqlite::memory:").await?;
    /// m.run(&pool).await
    /// #     })
    /// # }
    /// ```
    pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError>
    where
        A: Acquire<'a>,
        <A::Connection as Deref>::Target: Migrate,
    {
        let mut conn = migrator.acquire().await?;

        // lock the database for exclusive access by the migrator
        conn.lock().await?;

        // creates [_migrations] table only if needed
        // eventually this will likely migrate previous versions of the table
        conn.ensure_migrations_table().await?;

        let (version, dirty) = conn.version().await?.unwrap_or((0, false));

        if dirty {
            return Err(MigrateError::Dirty(version));
        }

        for migration in self.iter() {
            if migration.version > version {
                conn.apply(migration).await?;
            } else {
                conn.validate(migration).await?;
            }
        }

        // unlock the migrator to allow other migrators to run
        // but do nothing as we already migrated
        conn.unlock().await?;

        Ok(())
    }
}