1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use crate::acquire::Acquire;
use crate::migrate::{Migrate, MigrateError, Migration, MigrationSource};
use std::borrow::Cow;
use std::ops::Deref;
use std::slice;
#[derive(Debug)]
pub struct Migrator {
pub migrations: Cow<'static, [Migration]>,
}
impl Migrator {
/// Creates a new instance with the given source.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// use std::path::Path;
///
/// // Read migrations from a local folder: ./migrations
/// let m = Migrator::new(Path::new("./migrations")).await?;
/// # Ok(())
/// # })
/// # }
/// ```
pub async fn new<'s, S>(source: S) -> Result<Self, MigrateError>
where
S: MigrationSource<'s>,
{
Ok(Self {
migrations: Cow::Owned(source.resolve().await.map_err(MigrateError::Source)?),
})
}
/// Get an iterator over all known migrations.
pub fn iter(&self) -> slice::Iter<'_, Migration> {
self.migrations.iter()
}
/// Run any pending migrations against the database; and, validate previously applied migrations
/// against the current migration source to detect accidental changes in previously-applied migrations.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # #[cfg(feature = "sqlite")]
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// let m = Migrator::new(std::path::Path::new("./migrations")).await?;
/// let pool = sqlx_core::sqlite::SqlitePoolOptions::new().connect("sqlite::memory:").await?;
/// m.run(&pool).await
/// # })
/// # }
/// ```
pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError>
where
A: Acquire<'a>,
<A::Connection as Deref>::Target: Migrate,
{
let mut conn = migrator.acquire().await?;
// lock the database for exclusive access by the migrator
conn.lock().await?;
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table().await?;
let (version, dirty) = conn.version().await?.unwrap_or((0, false));
if dirty {
return Err(MigrateError::Dirty(version));
}
for migration in self.iter() {
if migration.version > version {
conn.apply(migration).await?;
} else {
conn.validate(migration).await?;
}
}
// unlock the migrator to allow other migrators to run
// but do nothing as we already migrated
conn.unlock().await?;
Ok(())
}
}