refinery_core/
runner.rs

1use siphasher::sip::SipHasher13;
2use time::OffsetDateTime;
3
4use log::error;
5use std::cmp::Ordering;
6use std::collections::VecDeque;
7use std::fmt;
8use std::hash::{Hash, Hasher};
9
10use crate::traits::{sync::migrate as sync_migrate, DEFAULT_MIGRATION_TABLE_NAME};
11use crate::util::parse_migration_name;
12use crate::{AsyncMigrate, Error, Migrate};
13use std::fmt::Formatter;
14
15/// An enum set that represents the type of the Migration
16#[derive(Clone, PartialEq)]
17pub enum Type {
18    Versioned,
19    Unversioned,
20}
21
22impl fmt::Display for Type {
23    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
24        let version_type = match self {
25            Type::Versioned => "V",
26            Type::Unversioned => "U",
27        };
28        write!(f, "{}", version_type)
29    }
30}
31
32impl fmt::Debug for Type {
33    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
34        let version_type = match self {
35            Type::Versioned => "Versioned",
36            Type::Unversioned => "Unversioned",
37        };
38        write!(f, "{}", version_type)
39    }
40}
41
42/// An enum set that represents the target version up to which refinery should migrate, it is used by [Runner]
43#[derive(Clone, Copy, Debug)]
44pub enum Target {
45    Latest,
46    Version(u32),
47    Fake,
48    FakeVersion(u32),
49}
50
51// an Enum set that represents the state of the migration: Applied on the database,
52// or Unapplied yet to be applied on the database
53#[derive(Clone, Debug)]
54enum State {
55    Applied,
56    Unapplied,
57}
58
59/// Represents a schema migration to be run on the database,
60/// this struct is used by the [`embed_migrations!`] macro to gather migration files
61/// and shouldn't be needed by the user
62///
63/// [`embed_migrations!`]: macro.embed_migrations.html
64#[derive(Clone, Debug)]
65pub struct Migration {
66    state: State,
67    name: String,
68    checksum: u64,
69    version: i32,
70    prefix: Type,
71    sql: Option<String>,
72    applied_on: Option<OffsetDateTime>,
73}
74
75impl Migration {
76    /// Create an unapplied migration, name and version are parsed from the input_name,
77    /// which must be named in the format (U|V){1}__{2}.rs where {1} represents the migration version and {2} the name.
78    pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
79        let (prefix, version, name) = parse_migration_name(input_name)?;
80
81        // Previously, `std::collections::hash_map::DefaultHasher` was used
82        // to calculate the checksum and the implementation at that time
83        // was SipHasher13. However, that implementation is not guaranteed:
84        // > The internal algorithm is not specified, and so it and its
85        // > hashes should not be relied upon over releases.
86        // We now explicitly use SipHasher13 to both remain compatible with
87        // existing migrations and prevent breaking from possible future
88        // changes to `DefaultHasher`.
89        let mut hasher = SipHasher13::new();
90        name.hash(&mut hasher);
91        version.hash(&mut hasher);
92        sql.hash(&mut hasher);
93        let checksum = hasher.finish();
94
95        Ok(Migration {
96            state: State::Unapplied,
97            name,
98            version,
99            prefix,
100            sql: Some(sql.into()),
101            applied_on: None,
102            checksum,
103        })
104    }
105
106    // Create a migration from an applied migration on the database
107    pub fn applied(
108        version: i32,
109        name: String,
110        applied_on: OffsetDateTime,
111        checksum: u64,
112    ) -> Migration {
113        Migration {
114            state: State::Applied,
115            name,
116            checksum,
117            version,
118            // applied migrations are always versioned
119            prefix: Type::Versioned,
120            sql: None,
121            applied_on: Some(applied_on),
122        }
123    }
124
125    // convert the Unapplied into an Applied Migration
126    pub fn set_applied(&mut self) {
127        self.applied_on = Some(OffsetDateTime::now_utc());
128        self.state = State::Applied;
129    }
130
131    // Get migration sql content
132    pub fn sql(&self) -> Option<&str> {
133        self.sql.as_deref()
134    }
135
136    /// Get the Migration version
137    pub fn version(&self) -> u32 {
138        self.version as u32
139    }
140
141    /// Get the Prefix
142    pub fn prefix(&self) -> &Type {
143        &self.prefix
144    }
145
146    /// Get the Migration Name
147    pub fn name(&self) -> &str {
148        &self.name
149    }
150
151    /// Get the timestamp from when the Migration was applied. `None` when unapplied.
152    /// Migrations returned from Runner::get_migrations() will always have `None`.
153    pub fn applied_on(&self) -> Option<&OffsetDateTime> {
154        self.applied_on.as_ref()
155    }
156
157    /// Get the Migration checksum. Checksum is formed from the name version and sql of the Migration
158    pub fn checksum(&self) -> u64 {
159        self.checksum
160    }
161}
162
163impl fmt::Display for Migration {
164    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
165        write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
166    }
167}
168
169impl Eq for Migration {}
170
171impl PartialEq for Migration {
172    fn eq(&self, other: &Migration) -> bool {
173        self.version == other.version
174            && self.name == other.name
175            && self.checksum() == other.checksum()
176    }
177}
178
179impl Ord for Migration {
180    fn cmp(&self, other: &Migration) -> Ordering {
181        self.version.cmp(&other.version)
182    }
183}
184
185impl PartialOrd for Migration {
186    fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
187        Some(self.cmp(other))
188    }
189}
190
191/// Struct that represents the report of the migration cycle,
192/// a `Report` instance is returned by the [`Runner::run`] and [`Runner::run_async`] methods
193/// via [`Result`]`<Report, Error>`, on case of an [`Error`] during a migration, you can access the `Report` with [`Error.report`]
194///
195/// [`Error`]: struct.Error.html
196/// [`Runner::run`]: struct.Runner.html#method.run
197/// [`Runner::run_async`]: struct.Runner.html#method.run_async
198/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
199/// [`Error.report`]:  struct.Error.html#method.report
200#[derive(Clone, Debug)]
201pub struct Report {
202    applied_migrations: Vec<Migration>,
203}
204
205impl Report {
206    /// Instantiate a new Report
207    pub fn new(applied_migrations: Vec<Migration>) -> Report {
208        Report { applied_migrations }
209    }
210
211    /// Retrieves the list of applied `Migration` of the migration cycle
212    pub fn applied_migrations(&self) -> &Vec<Migration> {
213        &self.applied_migrations
214    }
215}
216
217/// Struct that represents the entrypoint to run the migrations,
218/// an instance of this struct is returned by the [`embed_migrations!`] macro.
219/// `Runner` should not need to be instantiated manually
220///
221/// [`embed_migrations!`]: macro.embed_migrations.html
222pub struct Runner {
223    grouped: bool,
224    abort_divergent: bool,
225    abort_missing: bool,
226    migrations: Vec<Migration>,
227    target: Target,
228    migration_table_name: String,
229}
230
231impl Runner {
232    /// instantiate a new Runner
233    pub fn new(migrations: &[Migration]) -> Runner {
234        Runner {
235            grouped: false,
236            target: Target::Latest,
237            abort_divergent: true,
238            abort_missing: true,
239            migrations: migrations.to_vec(),
240            migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
241        }
242    }
243
244    /// Get the gathered migrations.
245    pub fn get_migrations(&self) -> &Vec<Migration> {
246        &self.migrations
247    }
248
249    /// Set the target version up to which refinery should migrate, Latest migrates to the latest version available
250    /// Version migrates to a user provided version, a Version with a higher version than the latest will be ignored,
251    /// and Fake doesn't actually run any migration, just creates and updates refinery's schema migration table
252    /// by default this is set to Latest
253    pub fn set_target(self, target: Target) -> Runner {
254        Runner { target, ..self }
255    }
256
257    /// Set true if all migrations should be grouped and run in a single transaction.
258    /// by default this is set to false, each migration runs on their own transaction
259    ///
260    /// # Note
261    ///
262    /// set_grouped won't probably work on MySQL Databases as MySQL lacks support for transactions around schema alteration operations,
263    /// meaning that if a migration fails to apply you will have to manually unpick the changes in order to try again (it’s impossible to roll back to an earlier point).
264    pub fn set_grouped(self, grouped: bool) -> Runner {
265        Runner { grouped, ..self }
266    }
267
268    /// Set true if migration process should abort if divergent migrations are found
269    /// i.e. applied migrations with the same version but different name or checksum from the ones on the filesystem.
270    /// by default this is set to true
271    pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
272        Runner {
273            abort_divergent,
274            ..self
275        }
276    }
277
278    /// Set true if migration process should abort if missing migrations are found
279    /// i.e. applied migrations that are not found on the filesystem,
280    /// or migrations found on filesystem with a version inferior to the last one applied but not applied.
281    /// by default this is set to true
282    pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
283        Runner {
284            abort_missing,
285            ..self
286        }
287    }
288
289    /// Queries the database for the last applied migration, returns None if there aren't applied Migrations
290    pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
291    where
292        C: Migrate,
293    {
294        Migrate::get_last_applied_migration(conn, &self.migration_table_name)
295    }
296
297    /// Queries the database asynchronously for the last applied migration, returns None if there aren't applied Migrations
298    pub async fn get_last_applied_migration_async<C>(
299        &self,
300        conn: &mut C,
301    ) -> Result<Option<Migration>, Error>
302    where
303        C: AsyncMigrate + Send,
304    {
305        AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
306    }
307
308    /// Queries the database for all previous applied migrations
309    pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
310    where
311        C: Migrate,
312    {
313        Migrate::get_applied_migrations(conn, &self.migration_table_name)
314    }
315
316    /// Queries the database asynchronously for all previous applied migrations
317    pub async fn get_applied_migrations_async<C>(
318        &self,
319        conn: &mut C,
320    ) -> Result<Vec<Migration>, Error>
321    where
322        C: AsyncMigrate + Send,
323    {
324        AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
325    }
326
327    /// Set the table name to use for the migrations table. The default name is `refinery_schema_history`
328    ///
329    /// ### Warning
330    /// Changing this can be disastrous for your database. You should verify that the migrations table has the same
331    /// name as the name you specify here, if this is changed on an existing project.
332    ///
333    /// # Panics
334    ///
335    /// If the provided `migration_table_name` is empty
336    pub fn set_migration_table_name<S: AsRef<str>>(
337        &mut self,
338        migration_table_name: S,
339    ) -> &mut Self {
340        if migration_table_name.as_ref().is_empty() {
341            panic!("Migration table name must not be empty");
342        }
343
344        self.migration_table_name = migration_table_name.as_ref().to_string();
345        self
346    }
347
348    /// Creates an iterator over pending migrations, applying each before returning
349    /// the result from `next()`. If a migration fails, the iterator will return that
350    /// result and further calls to `next()` will return `None`.
351    pub fn run_iter<C>(
352        self,
353        connection: &mut C,
354    ) -> impl Iterator<Item = Result<Migration, Error>> + '_
355    where
356        C: Migrate,
357    {
358        RunIterator::new(self, connection)
359    }
360
361    /// Runs the Migrations in the supplied database connection
362    pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
363    where
364        C: Migrate,
365    {
366        Migrate::migrate(
367            connection,
368            &self.migrations,
369            self.abort_divergent,
370            self.abort_missing,
371            self.grouped,
372            self.target,
373            &self.migration_table_name,
374        )
375    }
376
377    /// Runs the Migrations asynchronously in the supplied database connection
378    pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
379    where
380        C: AsyncMigrate + Send,
381    {
382        AsyncMigrate::migrate(
383            connection,
384            &self.migrations,
385            self.abort_divergent,
386            self.abort_missing,
387            self.grouped,
388            self.target,
389            &self.migration_table_name,
390        )
391        .await
392    }
393}
394
395pub struct RunIterator<'a, C> {
396    connection: &'a mut C,
397    target: Target,
398    migration_table_name: String,
399    items: VecDeque<Migration>,
400    failed: bool,
401}
402impl<'a, C> RunIterator<'a, C>
403where
404    C: Migrate,
405{
406    pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
407        RunIterator {
408            items: VecDeque::from(
409                Migrate::get_unapplied_migrations(
410                    connection,
411                    &runner.migrations,
412                    runner.abort_divergent,
413                    runner.abort_missing,
414                    &runner.migration_table_name,
415                )
416                .unwrap(),
417            ),
418            connection,
419            target: runner.target,
420            migration_table_name: runner.migration_table_name.clone(),
421            failed: false,
422        }
423    }
424}
425impl<C> Iterator for RunIterator<'_, C>
426where
427    C: Migrate,
428{
429    type Item = Result<Migration, Error>;
430
431    fn next(&mut self) -> Option<Self::Item> {
432        match self.failed {
433            true => None,
434            false => self.items.pop_front().and_then(|migration| {
435                sync_migrate(
436                    self.connection,
437                    vec![migration],
438                    self.target,
439                    &self.migration_table_name,
440                    false,
441                )
442                .map(|r| r.applied_migrations.first().cloned())
443                .map_err(|e| {
444                    error!("migration failed: {e:?}");
445                    self.failed = true;
446                    e
447                })
448                .transpose()
449            }),
450        }
451    }
452}