tern_core/
migration.rs

1//! This module contains types and traits related to the migration files.
2//!
3//! * [`Migration`] is the abstract representation of what is built from a
4//!   migration file.
5//! * [`QueryBuilder`] is the recipe for building the query for a migration.
6//! * [`MigrationSource`] is the ability to produce the set of migrations, a
7//!   [`MigrationSet`], for a particular context in order to be ran in that
8//!   context.
9//! * [`MigrationContext`] is the core type.  It has an associated [`Executor`]
10//!   and it can produce the migrations from source.  Combined, it has the full
11//!   functionality of the migration tool.
12//!
13//! Generally these shouldn't be implemented; use the corresponding derive macro
14//! instead.
15use crate::error::{DatabaseError as _, TernResult};
16
17use chrono::{DateTime, Utc};
18use futures_core::{future::BoxFuture, Future};
19use std::{fmt::Write, time::Instant};
20
21/// The context in which a migration run occurs.
22pub trait MigrationContext
23where
24    Self: MigrationSource<Ctx = Self> + Send + Sync + 'static,
25{
26    /// The name of the table in the database that tracks the history of this
27    /// migration set.
28    ///
29    /// It defaults to `_tern_migrations` in the default schema for the
30    /// database driver if using the derive macro for this trait.
31    const HISTORY_TABLE: &str;
32
33    /// The type for executing queries in a migration run.
34    type Exec: Executor;
35
36    /// A reference to the underlying `Executor`.
37    fn executor(&mut self) -> &mut Self::Exec;
38
39    /// For a migration that is capable of building its query in this migration
40    /// context, this builds the query, applies the migration, then updates the
41    /// schema history table after.
42    fn apply<'migration, 'conn: 'migration, M>(
43        &'conn mut self,
44        migration: &'migration M,
45    ) -> BoxFuture<'migration, TernResult<AppliedMigration>>
46    where
47        M: Migration<Ctx = Self> + Send + Sync + ?Sized,
48    {
49        Box::pin(async move {
50            let start = Instant::now();
51            let query = M::build(migration, self).await?;
52            let executor = self.executor();
53
54            if migration.no_tx() {
55                executor
56                    .apply_no_tx(&query)
57                    .await
58                    .void_tern_migration_result(migration)?;
59            } else {
60                executor
61                    .apply_tx(&query)
62                    .await
63                    .void_tern_migration_result(migration)?;
64            }
65
66            let applied_at = Utc::now();
67            let duration_ms = start.elapsed().as_millis() as i64;
68            let applied = migration.to_applied(duration_ms, applied_at, query.sql());
69            executor
70                .insert_applied_migration(Self::HISTORY_TABLE, &applied)
71                .await?;
72
73            Ok(applied)
74        })
75    }
76
77    /// Gets the version of the most recently applied migration.
78    fn latest_version(&mut self) -> BoxFuture<'_, TernResult<Option<i64>>> {
79        Box::pin(async move {
80            let latest = self
81                .executor()
82                .get_all_applied(Self::HISTORY_TABLE)
83                .await?
84                .into_iter()
85                .fold(None, |acc, m| match acc {
86                    None => Some(m.version),
87                    Some(v) if m.version > v => Some(m.version),
88                    _ => acc,
89                });
90
91            Ok(latest)
92        })
93    }
94
95    /// Get all previously applied migrations.
96    fn previously_applied(&mut self) -> BoxFuture<'_, TernResult<Vec<AppliedMigration>>> {
97        Box::pin(self.executor().get_all_applied(Self::HISTORY_TABLE))
98    }
99
100    /// Check that the history table exists and create it if not.
101    fn check_history_table(&mut self) -> BoxFuture<'_, TernResult<()>> {
102        Box::pin(
103            self.executor()
104                .create_history_if_not_exists(Self::HISTORY_TABLE),
105        )
106    }
107
108    /// Drop the history table if requested.
109    fn drop_history_table(&mut self) -> BoxFuture<'_, TernResult<()>> {
110        Box::pin(self.executor().drop_history(Self::HISTORY_TABLE))
111    }
112
113    /// Insert an applied migration.
114    fn insert_applied<'migration, 'conn: 'migration>(
115        &'conn mut self,
116        applied: &'migration AppliedMigration,
117    ) -> BoxFuture<'migration, TernResult<()>> {
118        Box::pin(
119            self.executor()
120                .insert_applied_migration(Self::HISTORY_TABLE, applied),
121        )
122    }
123
124    /// Upsert applied migrations.
125    fn upsert_applied<'migration, 'conn: 'migration>(
126        &'conn mut self,
127        applied: &'migration AppliedMigration,
128    ) -> BoxFuture<'migration, TernResult<()>> {
129        Box::pin(
130            self.executor()
131                .upsert_applied_migration(Self::HISTORY_TABLE, applied),
132        )
133    }
134}
135
136/// The "executor" type for the database backend ultimately responsible for
137/// issuing migration and schema history queries.
138pub trait Executor
139where
140    Self: Send + Sync + 'static,
141{
142    /// The type of value that can produce queries for the history table of this
143    /// migration set.
144    type Queries: QueryRepository;
145
146    /// Apply the `Query` for the migration in a transaction.
147    fn apply_tx(&mut self, query: &Query) -> impl Future<Output = TernResult<()>> + Send;
148
149    /// Apply the `Query` for the migration _not_ in a transaction.
150    fn apply_no_tx(&mut self, query: &Query) -> impl Future<Output = TernResult<()>> + Send;
151
152    /// `CREATE IF NOT EXISTS` the history table.
153    fn create_history_if_not_exists(
154        &mut self,
155        history_table: &str,
156    ) -> impl Future<Output = TernResult<()>> + Send;
157
158    /// `DROP` the history table.
159    fn drop_history(&mut self, history_table: &str) -> impl Future<Output = TernResult<()>> + Send;
160
161    /// Get the complete history of applied migrations.
162    fn get_all_applied(
163        &mut self,
164        history_table: &str,
165    ) -> impl Future<Output = TernResult<Vec<AppliedMigration>>> + Send;
166
167    /// Insert an applied migration into the history table.
168    fn insert_applied_migration(
169        &mut self,
170        history_table: &str,
171        applied: &AppliedMigration,
172    ) -> impl Future<Output = TernResult<()>> + Send;
173
174    /// Update or insert an applied migration.
175    fn upsert_applied_migration(
176        &mut self,
177        history_table: &str,
178        applied: &AppliedMigration,
179    ) -> impl Future<Output = TernResult<()>> + Send;
180}
181
182/// A type that has a library of "administrative" queries that are needed during
183/// a migration run.
184pub trait QueryRepository {
185    /// The query that creates the schema history table or does nothing if it
186    /// already exists.
187    fn create_history_if_not_exists_query(history_table: &str) -> Query;
188
189    /// The query that drops the history table if requested.
190    fn drop_history_query(history_table: &str) -> Query;
191
192    /// The query to update the schema history table with an applied migration.
193    fn insert_into_history_query(history_table: &str, applied: &AppliedMigration) -> Query;
194
195    /// The query to return all rows from the schema history table.
196    fn select_star_from_history_query(history_table: &str) -> Query;
197
198    /// Query to insert or update a record in the history table.
199    fn upsert_history_query(history_table: &str, applied: &AppliedMigration) -> Query;
200}
201
202/// A single migration in a migration set.
203pub trait Migration
204where
205    Self: Send + Sync,
206{
207    /// A migration context that is sufficient to build this migration.
208    type Ctx: MigrationContext;
209
210    /// Get the `MigrationId` for this migration.
211    fn migration_id(&self) -> MigrationId;
212
213    /// The raw file content of the migration source file, or when stored as an
214    /// applied migration in the history table, it is the query that was ran.
215    fn content(&self) -> String;
216
217    /// Whether this migration should not be applied in a database transaction.
218    fn no_tx(&self) -> bool;
219
220    /// Produce a future resolving to the migration query when `await`ed.
221    fn build<'a>(&'a self, ctx: &'a mut Self::Ctx) -> BoxFuture<'a, TernResult<Query>>;
222
223    /// The migration version.
224    fn version(&self) -> i64 {
225        self.migration_id().version()
226    }
227
228    /// Convert this migration to an [`AppliedMigration`] assuming that it was
229    /// successfully applied.
230    fn to_applied(
231        &self,
232        duration_ms: i64,
233        applied_at: DateTime<Utc>,
234        content: &str,
235    ) -> AppliedMigration {
236        AppliedMigration::new(self.migration_id(), content, duration_ms, applied_at)
237    }
238}
239
240/// A type that is used to collect a [`MigrationSet`] -- migrations that are not
241/// applied yet -- which is used as the input to runner commands.
242pub trait MigrationSource {
243    /// A context that the set of migrations returned by `migration_set` would
244    /// need in order to be applied.
245    type Ctx: MigrationContext;
246
247    /// The set of migrations since `last_applied`.
248    fn migration_set(&self, last_applied: Option<i64>) -> MigrationSet<Self::Ctx>;
249}
250
251/// The `Migration`s derived from the files in the source directory that need to
252/// be applied.
253pub struct MigrationSet<Ctx: ?Sized> {
254    pub migrations: Vec<Box<dyn Migration<Ctx = Ctx>>>,
255}
256
257impl<Ctx> MigrationSet<Ctx>
258where
259    Ctx: MigrationContext,
260{
261    pub fn new<T>(vs: T) -> MigrationSet<Ctx>
262    where
263        T: Into<Vec<Box<dyn Migration<Ctx = Ctx>>>>,
264    {
265        let mut migrations = vs.into();
266        migrations.sort_by_key(|m| m.version());
267        MigrationSet { migrations }
268    }
269
270    /// Number of migrations in the set.
271    pub fn len(&self) -> usize {
272        self.migrations.len()
273    }
274
275    /// Versions present in this migration set.
276    pub fn versions(&self) -> Vec<i64> {
277        self.migrations
278            .iter()
279            .map(|m| m.version())
280            .collect::<Vec<_>>()
281    }
282
283    /// The version/name of migrations in this migration set.
284    pub fn migration_ids(&self) -> Vec<MigrationId> {
285        self.migrations
286            .iter()
287            .map(|m| m.migration_id())
288            .collect::<Vec<_>>()
289    }
290
291    /// The latest version in the set.
292    pub fn max(&self) -> Option<i64> {
293        self.versions().iter().max().copied()
294    }
295
296    /// The set is empty for the requested operation.
297    pub fn is_empty(&self) -> bool {
298        self.len() == 0
299    }
300}
301
302/// A helper trait for [`Migration`].
303///
304/// With the derive macros, the user's responsibility is to implement this for
305/// a Rust migration, and the proc macro uses it to build an implementation of
306/// [`Migration`].
307pub trait QueryBuilder {
308    /// The context for running the migration this query is for.
309    type Ctx: MigrationContext;
310
311    /// Asynchronously produce the migration query.
312    fn build(&self, ctx: &mut Self::Ctx) -> impl Future<Output = TernResult<Query>> + Send;
313}
314
315/// A SQL query.
316#[derive(Debug, Clone)]
317pub struct Query(pub(crate) String);
318
319impl Query {
320    pub fn new(sql: String) -> Self {
321        Self(sql)
322    }
323
324    fn sanitize(&self) -> String {
325        use regex::Regex;
326        let block_comment = Regex::new(r"\/\*(?s).*?(?-s)\*\/").unwrap();
327        let sql = self
328            .sql()
329            .trim()
330            .lines()
331            .filter(|line| {
332                let line = line.trim();
333                !line.starts_with("--") || line.is_empty()
334            })
335            .map(|line| {
336                // Remove trailing comments: "SELECT a -- like this"
337                let mut stripped = line.to_string();
338                let offset = stripped.find("--").unwrap_or(stripped.len());
339                stripped.replace_range(offset.., "");
340                stripped.trim_end().to_string()
341            })
342            .collect::<Vec<_>>()
343            .join("\n");
344        let stripped = block_comment.replace_all(&sql, "");
345
346        if !stripped.ends_with(";") {
347            format!("{stripped};")
348        } else {
349            stripped.to_string()
350        }
351    }
352
353    pub fn sql(&self) -> &str {
354        &self.0
355    }
356
357    /// Add another query to the end of this one.
358    pub fn append(&mut self, other: Self) -> TernResult<()> {
359        let mut buf = String::new();
360        writeln!(buf, "{}", self.0)?;
361        writeln!(buf, "{}", other.0)?;
362        self.0 = buf;
363        Ok(())
364    }
365
366    /// Split a query comprised of multiple statements.
367    ///
368    /// For queries having `no_tx = true`, a migration comprised of multiple,
369    /// separate SQL statements needs to be broken up so that the statements can
370    /// run sequentially.  Otherwise, many backends will run the collection of
371    /// statements in a transaction automatically, which breaches the `no_tx`
372    /// contract.
373    ///
374    /// _Warning_: This is sensitive to the particular character sequence for
375    /// writing comments.  Only `--` and C-style `/* ... */` are treated
376    /// correctly because this is valid comment syntax in any of the supported
377    /// backends.  A line starting with `#`, for instance, will not be treated as
378    /// a comment, and so only in MySQL where that does denote a comment, the
379    /// function may not separate multiple statements correctly, possibly leading
380    /// to syntax errors during query execution.
381    pub fn split_statements(&self) -> TernResult<Vec<String>> {
382        let mut statements = Vec::new();
383        self.sanitize()
384            .lines()
385            .try_fold(String::new(), |mut buf, line| {
386                if line.trim().is_empty() {
387                    return Ok(buf);
388                }
389                writeln!(buf, "{line}")?;
390                // If the line ends with `;` this is the end of the statement, so
391                // push the accumulated buffer to the vector and start a new one.
392                if line.ends_with(";") {
393                    statements.push(buf);
394                    Ok::<String, std::fmt::Error>(String::new())
395                } else {
396                    Ok(buf)
397                }
398            })?;
399
400        Ok(statements)
401    }
402}
403
404impl std::fmt::Display for Query {
405    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406        self.0.fmt(f)
407    }
408}
409
410/// Name/version derived from the migration source filename.
411#[derive(Debug, Clone, Hash, PartialOrd, Ord, PartialEq, Eq)]
412pub struct MigrationId {
413    /// Version parsed from the migration filename.
414    version: i64,
415    /// Description parsed from the migration filename.
416    description: String,
417}
418
419impl MigrationId {
420    pub fn new(version: i64, description: String) -> Self {
421        Self {
422            version,
423            description,
424        }
425    }
426
427    pub fn version(&self) -> i64 {
428        self.version
429    }
430
431    pub fn description(&self) -> String {
432        self.description.clone()
433    }
434}
435
436impl std::fmt::Display for MigrationId {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        write!(f, "V{}__{}", self.version, self.description)
439    }
440}
441
442impl From<AppliedMigration> for MigrationId {
443    fn from(value: AppliedMigration) -> Self {
444        Self {
445            version: value.version,
446            description: value.description,
447        }
448    }
449}
450
451/// An `AppliedMigration` is the information about a migration that completed
452/// successfully and it is also a row in the schema history table.
453#[derive(Debug, Clone)]
454#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
455pub struct AppliedMigration {
456    /// The migration version.
457    pub version: i64,
458    /// The description of the migration.
459    pub description: String,
460    /// The contents of the migration file at the time it was applied.
461    pub content: String,
462    /// How long the migration took to run in milliseconds.
463    pub duration_ms: i64,
464    /// The timestamp of when the migration was applied.
465    pub applied_at: DateTime<Utc>,
466}
467
468impl AppliedMigration {
469    pub fn new(
470        id: MigrationId,
471        content: &str,
472        duration_ms: i64,
473        applied_at: DateTime<Utc>,
474    ) -> Self {
475        Self {
476            version: id.version,
477            description: id.description,
478            content: content.into(),
479            duration_ms,
480            applied_at,
481        }
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::Query;
488
489    const SQL_IN1: &str = "
490-- This is a comment.
491SELECT
492  *
493FROM
494  the_schema.the_table
495WHERE
496  everything = 'is_good'
497";
498    const SQL_OUT1: &str = "SELECT
499  *
500FROM
501  the_schema.the_table
502WHERE
503  everything = 'is_good';
504";
505    const SQL_IN2: &str = "
506-- tern:noTransaction
507SELECT count(e.*),
508  e.x,
509  e.y -- This is the column called `y`
510FROM /* A comment block can even be like this */ the_table
511  as e
512JOIN another USING (id)
513/*
514This is a multi
515line
516comment
517*/
518WHERE false;
519
520SELECT a
521from x
522-- Asdfsdfsdfsdfsdsdf /* Unnecessary comment */
523where false
524
525;
526";
527    const SQL_OUT21: &str = "SELECT count(e.*),
528  e.x,
529  e.y
530FROM  the_table
531  as e
532JOIN another USING (id)
533WHERE false;
534";
535
536    const SQL_OUT22: &str = "SELECT a
537from x
538where false
539;
540";
541
542    #[test]
543    fn split_one() {
544        let q1 = Query::new(SQL_IN1.to_string());
545        let res1 = q1.split_statements();
546        assert!(res1.is_ok());
547        let split1 = res1.unwrap();
548        assert_eq!(split1, vec![SQL_OUT1.to_string()]);
549    }
550
551    #[test]
552    fn split_two() {
553        let q2 = Query::new(SQL_IN2.to_string());
554        let res2 = q2.split_statements();
555        assert!(res2.is_ok());
556        let split2 = res2.unwrap();
557        assert_eq!(split2, vec![SQL_OUT21.to_string(), SQL_OUT22.to_string()]);
558    }
559}