refinery_core/
runner.rs

1use siphasher::sip::SipHasher13;
2use time::OffsetDateTime;
3
4use log::error;
5use std::cmp::Ordering;
6use std::collections::VecDeque;
7use std::fmt;
8use std::hash::{Hash, Hasher};
9
10use crate::traits::{sync::migrate as sync_migrate, DEFAULT_MIGRATION_TABLE_NAME};
11use crate::util::{parse_migration_name, SchemaVersion};
12use crate::{AsyncMigrate, Error, Migrate};
13use std::fmt::Formatter;
14
15/// An enum set that represents the type of the Migration
16#[derive(Clone, PartialEq)]
17#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
18pub enum Type {
19    Versioned,
20    Unversioned,
21}
22
23impl fmt::Display for Type {
24    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
25        let version_type = match self {
26            Type::Versioned => "V",
27            Type::Unversioned => "U",
28        };
29        write!(f, "{version_type}")
30    }
31}
32
33impl fmt::Debug for Type {
34    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
35        let version_type = match self {
36            Type::Versioned => "Versioned",
37            Type::Unversioned => "Unversioned",
38        };
39        write!(f, "{version_type}")
40    }
41}
42
43/// An enum set that represents the target version up to which refinery should migrate, it is used by [Runner]
44#[derive(Clone, Copy, Debug)]
45pub enum Target {
46    Latest,
47    Version(SchemaVersion),
48    Fake,
49    FakeVersion(SchemaVersion),
50}
51
52// an Enum set that represents the state of the migration: Applied on the database,
53// or Unapplied yet to be applied on the database
54#[derive(Clone, Debug)]
55#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
56enum State {
57    Applied,
58    Unapplied,
59}
60
61/// Represents a schema migration to be run on the database,
62/// this struct is used by the [`embed_migrations!`] macro to gather migration files
63/// and shouldn't be needed by the user
64///
65/// [`embed_migrations!`]: macro.embed_migrations.html
66#[derive(Clone, Debug)]
67#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
68pub struct Migration {
69    state: State,
70    name: String,
71    checksum: u64,
72    version: SchemaVersion,
73    prefix: Type,
74    sql: Option<String>,
75    applied_on: Option<OffsetDateTime>,
76}
77
78impl Migration {
79    /// Create an unapplied migration, name and version are parsed from the input_name,
80    /// which must be named in the format (U|V){1}__{2}.rs where {1} represents the migration version and {2} the name.
81    pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
82        let (prefix, version, name) = parse_migration_name(input_name)?;
83
84        // Previously, `std::collections::hash_map::DefaultHasher` was used
85        // to calculate the checksum and the implementation at that time
86        // was SipHasher13. However, that implementation is not guaranteed:
87        // > The internal algorithm is not specified, and so it and its
88        // > hashes should not be relied upon over releases.
89        // We now explicitly use SipHasher13 to both remain compatible with
90        // existing migrations and prevent breaking from possible future
91        // changes to `DefaultHasher`.
92        let mut hasher = SipHasher13::new();
93        name.hash(&mut hasher);
94        version.hash(&mut hasher);
95        sql.hash(&mut hasher);
96        let checksum = hasher.finish();
97
98        Ok(Migration {
99            state: State::Unapplied,
100            name,
101            version,
102            prefix,
103            sql: Some(sql.into()),
104            applied_on: None,
105            checksum,
106        })
107    }
108
109    // Create a migration from an applied migration on the database
110    pub fn applied(
111        version: SchemaVersion,
112        name: String,
113        applied_on: OffsetDateTime,
114        checksum: u64,
115    ) -> Migration {
116        Migration {
117            state: State::Applied,
118            name,
119            checksum,
120            version,
121            // applied migrations are always versioned
122            prefix: Type::Versioned,
123            sql: None,
124            applied_on: Some(applied_on),
125        }
126    }
127
128    // convert the Unapplied into an Applied Migration
129    pub fn set_applied(&mut self) {
130        self.applied_on = Some(OffsetDateTime::now_utc());
131        self.state = State::Applied;
132    }
133
134    // Get migration sql content
135    pub fn sql(&self) -> Option<&str> {
136        self.sql.as_deref()
137    }
138
139    /// Get the Migration version
140    pub fn version(&self) -> SchemaVersion {
141        self.version
142    }
143
144    /// Get the Prefix
145    pub fn prefix(&self) -> &Type {
146        &self.prefix
147    }
148
149    /// Get the Migration Name
150    pub fn name(&self) -> &str {
151        &self.name
152    }
153
154    /// Get the timestamp from when the Migration was applied. `None` when unapplied.
155    /// Migrations returned from Runner::get_migrations() will always have `None`.
156    pub fn applied_on(&self) -> Option<&OffsetDateTime> {
157        self.applied_on.as_ref()
158    }
159
160    /// Get the Migration checksum. Checksum is formed from the name version and sql of the Migration
161    pub fn checksum(&self) -> u64 {
162        self.checksum
163    }
164}
165
166impl fmt::Display for Migration {
167    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
168        write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
169    }
170}
171
172impl Eq for Migration {}
173
174impl PartialEq for Migration {
175    fn eq(&self, other: &Migration) -> bool {
176        self.version == other.version
177            && self.name == other.name
178            && self.checksum() == other.checksum()
179    }
180}
181
182impl Ord for Migration {
183    fn cmp(&self, other: &Migration) -> Ordering {
184        self.version.cmp(&other.version)
185    }
186}
187
188impl PartialOrd for Migration {
189    fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
190        Some(self.cmp(other))
191    }
192}
193
194/// Struct that represents the report of the migration cycle,
195/// a `Report` instance is returned by the [`Runner::run`] and [`Runner::run_async`] methods
196/// via [`Result`]`<Report, Error>`, on case of an [`Error`] during a migration, you can access the `Report` with [`Error.report`]
197///
198/// [`Error`]: struct.Error.html
199/// [`Runner::run`]: struct.Runner.html#method.run
200/// [`Runner::run_async`]: struct.Runner.html#method.run_async
201/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
202/// [`Error.report`]:  struct.Error.html#method.report
203#[derive(Clone, Debug)]
204pub struct Report {
205    applied_migrations: Vec<Migration>,
206}
207
208impl Report {
209    /// Instantiate a new Report
210    pub fn new(applied_migrations: Vec<Migration>) -> Report {
211        Report { applied_migrations }
212    }
213
214    /// Retrieves the list of applied `Migration` of the migration cycle
215    pub fn applied_migrations(&self) -> &Vec<Migration> {
216        &self.applied_migrations
217    }
218}
219
220/// Struct that represents the entrypoint to run the migrations,
221/// an instance of this struct is returned by the [`embed_migrations!`] macro.
222/// `Runner` should not need to be instantiated manually
223///
224/// [`embed_migrations!`]: macro.embed_migrations.html
225pub struct Runner {
226    grouped: bool,
227    abort_divergent: bool,
228    abort_missing: bool,
229    migrations: Vec<Migration>,
230    target: Target,
231    migration_table_name: String,
232}
233
234impl Runner {
235    /// instantiate a new Runner
236    pub fn new(migrations: &[Migration]) -> Runner {
237        Runner {
238            grouped: false,
239            target: Target::Latest,
240            abort_divergent: true,
241            abort_missing: true,
242            migrations: migrations.to_vec(),
243            migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
244        }
245    }
246
247    /// Get the gathered migrations.
248    pub fn get_migrations(&self) -> &Vec<Migration> {
249        &self.migrations
250    }
251
252    /// Set the target version up to which refinery should migrate, Latest migrates to the latest version available
253    /// Version migrates to a user provided version, a Version with a higher version than the latest will be ignored,
254    /// and Fake doesn't actually run any migration, just creates and updates refinery's schema migration table
255    /// by default this is set to Latest
256    pub fn set_target(self, target: Target) -> Runner {
257        Runner { target, ..self }
258    }
259
260    /// Set true if all migrations should be grouped and run in a single transaction.
261    /// by default this is set to false, each migration runs on their own transaction
262    ///
263    /// # Note
264    ///
265    /// set_grouped won't probably work on MySQL Databases as MySQL lacks support for transactions around schema alteration operations,
266    /// meaning that if a migration fails to apply you will have to manually unpick the changes in order to try again (it’s impossible to roll back to an earlier point).
267    pub fn set_grouped(self, grouped: bool) -> Runner {
268        Runner { grouped, ..self }
269    }
270
271    /// Set true if migration process should abort if divergent migrations are found
272    /// i.e. applied migrations with the same version but different name or checksum from the ones on the filesystem.
273    /// by default this is set to true
274    pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
275        Runner {
276            abort_divergent,
277            ..self
278        }
279    }
280
281    /// Set true if migration process should abort if missing migrations are found
282    /// i.e. applied migrations that are not found on the filesystem,
283    /// or migrations found on filesystem with a version inferior to the last one applied but not applied.
284    /// by default this is set to true
285    pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
286        Runner {
287            abort_missing,
288            ..self
289        }
290    }
291
292    /// Queries the database for the last applied migration, returns None if there aren't applied Migrations
293    pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
294    where
295        C: Migrate,
296    {
297        Migrate::get_last_applied_migration(conn, &self.migration_table_name)
298    }
299
300    /// Queries the database asynchronously for the last applied migration, returns None if there aren't applied Migrations
301    pub async fn get_last_applied_migration_async<C>(
302        &self,
303        conn: &mut C,
304    ) -> Result<Option<Migration>, Error>
305    where
306        C: AsyncMigrate + Send,
307    {
308        AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
309    }
310
311    /// Queries the database for all previous applied migrations
312    pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
313    where
314        C: Migrate,
315    {
316        Migrate::get_applied_migrations(conn, &self.migration_table_name)
317    }
318
319    /// Queries the database asynchronously for all previous applied migrations
320    pub async fn get_applied_migrations_async<C>(
321        &self,
322        conn: &mut C,
323    ) -> Result<Vec<Migration>, Error>
324    where
325        C: AsyncMigrate + Send,
326    {
327        AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
328    }
329
330    /// Set the table name to use for the migrations table. The default name is `refinery_schema_history`
331    ///
332    /// ### Warning
333    /// Changing this can be disastrous for your database. You should verify that the migrations table has the same
334    /// name as the name you specify here, if this is changed on an existing project.
335    ///
336    /// # Panics
337    ///
338    /// If the provided `migration_table_name` is empty
339    pub fn set_migration_table_name<S: AsRef<str>>(
340        &mut self,
341        migration_table_name: S,
342    ) -> &mut Self {
343        if migration_table_name.as_ref().is_empty() {
344            panic!("Migration table name must not be empty");
345        }
346
347        self.migration_table_name = migration_table_name.as_ref().to_string();
348        self
349    }
350
351    /// Creates an iterator over pending migrations, applying each before returning
352    /// the result from `next()`. If a migration fails, the iterator will return that
353    /// result and further calls to `next()` will return `None`.
354    pub fn run_iter<C>(
355        self,
356        connection: &mut C,
357    ) -> impl Iterator<Item = Result<Migration, Error>> + '_
358    where
359        C: Migrate,
360    {
361        RunIterator::new(self, connection)
362    }
363
364    /// Runs the Migrations in the supplied database connection
365    pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
366    where
367        C: Migrate,
368    {
369        Migrate::migrate(
370            connection,
371            &self.migrations,
372            self.abort_divergent,
373            self.abort_missing,
374            self.grouped,
375            self.target,
376            &self.migration_table_name,
377        )
378    }
379
380    /// Runs the Migrations asynchronously in the supplied database connection
381    pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
382    where
383        C: AsyncMigrate + Send,
384    {
385        AsyncMigrate::migrate(
386            connection,
387            &self.migrations,
388            self.abort_divergent,
389            self.abort_missing,
390            self.grouped,
391            self.target,
392            &self.migration_table_name,
393        )
394        .await
395    }
396}
397
398pub struct RunIterator<'a, C> {
399    connection: &'a mut C,
400    target: Target,
401    migration_table_name: String,
402    items: VecDeque<Migration>,
403    failed: bool,
404}
405impl<'a, C> RunIterator<'a, C>
406where
407    C: Migrate,
408{
409    pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
410        RunIterator {
411            items: VecDeque::from(
412                Migrate::get_unapplied_migrations(
413                    connection,
414                    &runner.migrations,
415                    runner.abort_divergent,
416                    runner.abort_missing,
417                    &runner.migration_table_name,
418                )
419                .unwrap(),
420            ),
421            connection,
422            target: runner.target,
423            migration_table_name: runner.migration_table_name.clone(),
424            failed: false,
425        }
426    }
427}
428impl<C> Iterator for RunIterator<'_, C>
429where
430    C: Migrate,
431{
432    type Item = Result<Migration, Error>;
433
434    fn next(&mut self) -> Option<Self::Item> {
435        match self.failed {
436            true => None,
437            false => self.items.pop_front().and_then(|migration| {
438                sync_migrate(
439                    self.connection,
440                    vec![migration],
441                    self.target,
442                    &self.migration_table_name,
443                    false,
444                )
445                .map(|r| r.applied_migrations.first().cloned())
446                .map_err(|e| {
447                    error!("migration failed: {e:?}");
448                    self.failed = true;
449                    e
450                })
451                .transpose()
452            }),
453        }
454    }
455}