trailbase_refinery/
runner.rs

1use siphasher::sip::SipHasher13;
2use time::OffsetDateTime;
3
4use log::error;
5use std::cmp::Ordering;
6use std::collections::VecDeque;
7use std::fmt;
8use std::hash::{Hash, Hasher};
9
10use crate::traits::{DEFAULT_MIGRATION_TABLE_NAME, sync::migrate as sync_migrate};
11use crate::util::parse_migration_name;
12use crate::{AsyncMigrate, Error, Migrate};
13use std::fmt::Formatter;
14
15/// An enum set that represents the type of the Migration
16#[derive(Clone, PartialEq)]
17pub enum Type {
18  Versioned,
19  Unversioned,
20}
21
22impl fmt::Display for Type {
23  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
24    let version_type = match self {
25      Type::Versioned => "V",
26      Type::Unversioned => "U",
27    };
28    write!(f, "{}", version_type)
29  }
30}
31
32impl fmt::Debug for Type {
33  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
34    let version_type = match self {
35      Type::Versioned => "Versioned",
36      Type::Unversioned => "Unversioned",
37    };
38    write!(f, "{}", version_type)
39  }
40}
41
42/// An enum set that represents the target version up to which refinery should migrate, it is used
43/// by [Runner]
44#[derive(Clone, Copy, Debug)]
45pub enum Target {
46  Latest,
47  Version(u32),
48  Fake,
49  FakeVersion(u32),
50}
51
52// an Enum set that represents the state of the migration: Applied on the database,
53// or Unapplied yet to be applied on the database
54#[derive(Clone, Debug)]
55enum State {
56  Applied,
57  Unapplied,
58}
59
60/// Represents a schema migration to be run on the database,
61/// this struct is used by the [`embed_migrations!`] macro to gather migration files
62/// and shouldn't be needed by the user
63///
64/// [`embed_migrations!`]: macro.embed_migrations.html
65#[derive(Clone, Debug)]
66pub struct Migration {
67  state: State,
68  name: String,
69  checksum: u64,
70  version: i32,
71  prefix: Type,
72  sql: Option<String>,
73  applied_on: Option<OffsetDateTime>,
74}
75
76impl Migration {
77  /// Create an unapplied migration, name and version are parsed from the input_name,
78  /// which must be named in the format (U|V){1}__{2}.rs where {1} represents the migration version
79  /// and {2} the name.
80  pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
81    let (prefix, version, name) = parse_migration_name(input_name)?;
82
83    // Previously, `std::collections::hash_map::DefaultHasher` was used
84    // to calculate the checksum and the implementation at that time
85    // was SipHasher13. However, that implementation is not guaranteed:
86    // > The internal algorithm is not specified, and so it and its
87    // > hashes should not be relied upon over releases.
88    // We now explicitly use SipHasher13 to both remain compatible with
89    // existing migrations and prevent breaking from possible future
90    // changes to `DefaultHasher`.
91    let mut hasher = SipHasher13::new();
92    name.hash(&mut hasher);
93    version.hash(&mut hasher);
94    sql.hash(&mut hasher);
95    let checksum = hasher.finish();
96
97    Ok(Migration {
98      state: State::Unapplied,
99      name,
100      version,
101      prefix,
102      sql: Some(sql.into()),
103      applied_on: None,
104      checksum,
105    })
106  }
107
108  // Create a migration from an applied migration on the database
109  pub fn applied(
110    version: i32,
111    name: String,
112    applied_on: OffsetDateTime,
113    checksum: u64,
114  ) -> Migration {
115    Migration {
116      state: State::Applied,
117      name,
118      checksum,
119      version,
120      // applied migrations are always versioned
121      prefix: Type::Versioned,
122      sql: None,
123      applied_on: Some(applied_on),
124    }
125  }
126
127  // convert the Unapplied into an Applied Migration
128  pub fn set_applied(&mut self) {
129    self.applied_on = Some(OffsetDateTime::now_utc());
130    self.state = State::Applied;
131  }
132
133  // Get migration sql content
134  pub fn sql(&self) -> Option<&str> {
135    self.sql.as_deref()
136  }
137
138  /// Get the Migration version
139  pub fn version(&self) -> u32 {
140    self.version as u32
141  }
142
143  /// Get the Prefix
144  pub fn prefix(&self) -> &Type {
145    &self.prefix
146  }
147
148  /// Get the Migration Name
149  pub fn name(&self) -> &str {
150    &self.name
151  }
152
153  /// Get the timestamp from when the Migration was applied. `None` when unapplied.
154  /// Migrations returned from Runner::get_migrations() will always have `None`.
155  pub fn applied_on(&self) -> Option<&OffsetDateTime> {
156    self.applied_on.as_ref()
157  }
158
159  /// Get the Migration checksum. Checksum is formed from the name version and sql of the Migration
160  pub fn checksum(&self) -> u64 {
161    self.checksum
162  }
163}
164
165impl fmt::Display for Migration {
166  fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167    write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
168  }
169}
170
171impl Eq for Migration {}
172
173impl PartialEq for Migration {
174  fn eq(&self, other: &Migration) -> bool {
175    self.version == other.version && self.name == other.name && self.checksum() == other.checksum()
176  }
177}
178
179impl Ord for Migration {
180  fn cmp(&self, other: &Migration) -> Ordering {
181    self.version.cmp(&other.version)
182  }
183}
184
185impl PartialOrd for Migration {
186  fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
187    Some(self.cmp(other))
188  }
189}
190
191/// Struct that represents the report of the migration cycle,
192/// a `Report` instance is returned by the [`Runner::run`] and [`Runner::run_async`] methods
193/// via [`Result`]`<Report, Error>`, on case of an [`Error`] during a migration, you can access the
194/// `Report` with [`Error.report`]
195///
196/// [`Error`]: struct.Error.html
197/// [`Runner::run`]: struct.Runner.html#method.run
198/// [`Runner::run_async`]: struct.Runner.html#method.run_async
199/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
200/// [`Error.report`]:  struct.Error.html#method.report
201#[derive(Clone, Debug)]
202pub struct Report {
203  applied_migrations: Vec<Migration>,
204}
205
206impl Report {
207  /// Instantiate a new Report
208  pub(crate) fn new(applied_migrations: Vec<Migration>) -> Report {
209    Report { applied_migrations }
210  }
211
212  /// Retrieves the list of applied `Migration` of the migration cycle
213  pub fn applied_migrations(&self) -> &Vec<Migration> {
214    &self.applied_migrations
215  }
216}
217
218/// Struct that represents the entrypoint to run the migrations,
219/// an instance of this struct is returned by the [`embed_migrations!`] macro.
220/// `Runner` should not need to be instantiated manually
221///
222/// [`embed_migrations!`]: macro.embed_migrations.html
223pub struct Runner {
224  grouped: bool,
225  abort_divergent: bool,
226  abort_missing: bool,
227  migrations: Vec<Migration>,
228  target: Target,
229  migration_table_name: String,
230}
231
232impl Runner {
233  /// instantiate a new Runner
234  pub fn new(migrations: &[Migration]) -> Runner {
235    Runner {
236      grouped: false,
237      target: Target::Latest,
238      abort_divergent: true,
239      abort_missing: true,
240      migrations: migrations.to_vec(),
241      migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
242    }
243  }
244
245  /// Get the gathered migrations.
246  pub fn get_migrations(&self) -> &Vec<Migration> {
247    &self.migrations
248  }
249
250  /// Set the target version up to which refinery should migrate, Latest migrates to the latest
251  /// version available Version migrates to a user provided version, a Version with a higher
252  /// version than the latest will be ignored, and Fake doesn't actually run any migration, just
253  /// creates and updates refinery's schema migration table by default this is set to Latest
254  pub fn set_target(self, target: Target) -> Runner {
255    Runner { target, ..self }
256  }
257
258  /// Set true if all migrations should be grouped and run in a single transaction.
259  /// by default this is set to false, each migration runs on their own transaction
260  ///
261  /// # Note
262  ///
263  /// set_grouped won't probably work on MySQL Databases as MySQL lacks support for transactions
264  /// around schema alteration operations, meaning that if a migration fails to apply you will
265  /// have to manually unpick the changes in order to try again (it’s impossible to roll back to an
266  /// earlier point).
267  pub fn set_grouped(self, grouped: bool) -> Runner {
268    Runner { grouped, ..self }
269  }
270
271  /// Set true if migration process should abort if divergent migrations are found
272  /// i.e. applied migrations with the same version but different name or checksum from the ones on
273  /// the filesystem. by default this is set to true
274  pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
275    Runner {
276      abort_divergent,
277      ..self
278    }
279  }
280
281  /// Set true if migration process should abort if missing migrations are found
282  /// i.e. applied migrations that are not found on the filesystem,
283  /// or migrations found on filesystem with a version inferior to the last one applied but not
284  /// applied. by default this is set to true
285  pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
286    Runner {
287      abort_missing,
288      ..self
289    }
290  }
291
292  /// Queries the database for the last applied migration, returns None if there aren't applied
293  /// Migrations
294  pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
295  where
296    C: Migrate,
297  {
298    Migrate::get_last_applied_migration(conn, &self.migration_table_name)
299  }
300
301  /// Queries the database asynchronously for the last applied migration, returns None if there
302  /// aren't applied Migrations
303  pub async fn get_last_applied_migration_async<C>(
304    &self,
305    conn: &mut C,
306  ) -> Result<Option<Migration>, Error>
307  where
308    C: AsyncMigrate + Send,
309  {
310    AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
311  }
312
313  /// Queries the database for all previous applied migrations
314  pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
315  where
316    C: Migrate,
317  {
318    Migrate::get_applied_migrations(conn, &self.migration_table_name)
319  }
320
321  /// Queries the database asynchronously for all previous applied migrations
322  pub async fn get_applied_migrations_async<C>(&self, conn: &mut C) -> Result<Vec<Migration>, Error>
323  where
324    C: AsyncMigrate + Send,
325  {
326    AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
327  }
328
329  /// Set the table name to use for the migrations table. The default name is
330  /// `refinery_schema_history`
331  ///
332  /// ### Warning
333  /// Changing this can be disastrous for your database. You should verify that the migrations table
334  /// has the same name as the name you specify here, if this is changed on an existing project.
335  ///
336  /// # Panics
337  ///
338  /// If the provided `migration_table_name` is empty
339  pub fn set_migration_table_name<S: AsRef<str>>(&mut self, migration_table_name: S) -> &mut Self {
340    if migration_table_name.as_ref().is_empty() {
341      panic!("Migration table name must not be empty");
342    }
343
344    self.migration_table_name = migration_table_name.as_ref().to_string();
345    self
346  }
347
348  /// Creates an iterator over pending migrations, applying each before returning
349  /// the result from `next()`. If a migration fails, the iterator will return that
350  /// result and further calls to `next()` will return `None`.
351  pub fn run_iter<C>(
352    self,
353    connection: &mut C,
354  ) -> impl Iterator<Item = Result<Migration, Error>> + '_
355  where
356    C: Migrate,
357  {
358    RunIterator::new(self, connection)
359  }
360
361  /// Runs the Migrations in the supplied database connection
362  pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
363  where
364    C: Migrate,
365  {
366    Migrate::migrate(
367      connection,
368      &self.migrations,
369      self.abort_divergent,
370      self.abort_missing,
371      self.grouped,
372      self.target,
373      &self.migration_table_name,
374    )
375  }
376
377  /// Runs the Migrations asynchronously in the supplied database connection
378  pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
379  where
380    C: AsyncMigrate + Send,
381  {
382    AsyncMigrate::migrate(
383      connection,
384      &self.migrations,
385      self.abort_divergent,
386      self.abort_missing,
387      self.grouped,
388      self.target,
389      &self.migration_table_name,
390    )
391    .await
392  }
393}
394
395pub struct RunIterator<'a, C> {
396  connection: &'a mut C,
397  target: Target,
398  migration_table_name: String,
399  items: VecDeque<Migration>,
400  failed: bool,
401}
402impl<'a, C> RunIterator<'a, C>
403where
404  C: Migrate,
405{
406  pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
407    RunIterator {
408      items: VecDeque::from(
409        Migrate::get_unapplied_migrations(
410          connection,
411          &runner.migrations,
412          runner.abort_divergent,
413          runner.abort_missing,
414          &runner.migration_table_name,
415        )
416        .unwrap(),
417      ),
418      connection,
419      target: runner.target,
420      migration_table_name: runner.migration_table_name.clone(),
421      failed: false,
422    }
423  }
424}
425impl<C> Iterator for RunIterator<'_, C>
426where
427  C: Migrate,
428{
429  type Item = Result<Migration, Error>;
430
431  fn next(&mut self) -> Option<Self::Item> {
432    match self.failed {
433      true => None,
434      false => self.items.pop_front().and_then(|migration| {
435        sync_migrate(
436          self.connection,
437          vec![migration],
438          self.target,
439          &self.migration_table_name,
440          false,
441        )
442        .map(|r| r.applied_migrations.first().cloned())
443        .map_err(|e| {
444          error!("migration failed: {e:?}");
445          self.failed = true;
446          e
447        })
448        .transpose()
449      }),
450    }
451  }
452}