tern_core/
migration.rs

1//! This module contains types and traits related to the migration files.
2//!
3//! * [`Migration`] is the abstract representation of what is built from a
4//!   migration file.
5//! * [`QueryBuilder`] is the recipe for building the query for a migration.
6//! * [`MigrationSource`] is the ability to produce the set of migrations, a
7//!   [`MigrationSet`], for a particular context in order to be ran in that
8//!   context.
9//! * [`MigrationContext`] is the core type.  It has an associated [`Executor`]
10//!   and it can produce the migrations from source.  Combined, it has the full
11//!   functionality of the migration tool.
12//!
13//! Generally these don't need to be implemented.  Their corresponding derive
14//! macros can be used instead.
15use chrono::{DateTime, Utc};
16use futures_core::{future::BoxFuture, Future};
17use std::{fmt::Write, time::Instant};
18
19use crate::error::{DatabaseError as _, TernResult};
20
21/// The context in which a migration run occurs.
22pub trait MigrationContext
23where
24    Self: MigrationSource<Ctx = Self> + Send + Sync + 'static,
25{
26    /// The name of the table in the database that tracks the history of this
27    /// migration set.
28    ///
29    /// It defaults to `_tern_migrations` in the default schema for the
30    /// database driver if using the derive macro for this trait.
31    const HISTORY_TABLE: &str;
32
33    /// The type for executing queries in a migration run.
34    type Exec: Executor;
35
36    /// A reference to the underlying `Executor`.
37    fn executor(&mut self) -> &mut Self::Exec;
38
39    /// For a migration that is capable of building its query in this migration
40    /// context, this builds the query, applies the migration, then updates the
41    /// schema history table after.
42    fn apply<'migration, 'conn: 'migration, M>(
43        &'conn mut self,
44        migration: &'migration M,
45    ) -> BoxFuture<'migration, TernResult<AppliedMigration>>
46    where
47        M: Migration<Ctx = Self> + Send + Sync + ?Sized,
48    {
49        Box::pin(async move {
50            let start = Instant::now();
51            let query = M::build(migration, self).await?;
52            let executor = self.executor();
53
54            if migration.no_tx() {
55                executor
56                    .apply_no_tx(&query)
57                    .await
58                    .void_tern_migration_result(migration)?;
59            } else {
60                executor
61                    .apply_tx(&query)
62                    .await
63                    .void_tern_migration_result(migration)?;
64            }
65
66            let applied_at = Utc::now();
67            let duration_ms = start.elapsed().as_millis() as i64;
68            let applied = migration.to_applied(duration_ms, applied_at, query.sql());
69            executor
70                .insert_applied_migration(Self::HISTORY_TABLE, &applied)
71                .await?;
72
73            Ok(applied)
74        })
75    }
76
77    /// Gets the version of the most recently applied migration.
78    fn latest_version(&mut self) -> BoxFuture<'_, TernResult<Option<i64>>> {
79        Box::pin(async move {
80            let executor = self.executor();
81            let latest = executor
82                .get_all_applied(Self::HISTORY_TABLE)
83                .await?
84                .into_iter()
85                .fold(None, |acc, m| match acc {
86                    None => Some(m.version),
87                    Some(v) if m.version > v => Some(m.version),
88                    _ => acc,
89                });
90
91            Ok(latest)
92        })
93    }
94
95    /// Get all previously applied migrations.
96    fn previously_applied(&mut self) -> BoxFuture<'_, TernResult<Vec<AppliedMigration>>> {
97        Box::pin(async move {
98            let executor = self.executor();
99            let applied = executor.get_all_applied(Self::HISTORY_TABLE).await?;
100
101            Ok(applied)
102        })
103    }
104
105    /// Check that the history table exists and create it if not.
106    fn check_history_table(&mut self) -> BoxFuture<'_, TernResult<()>> {
107        Box::pin(async move {
108            let executor = self.executor();
109            executor
110                .create_history_if_not_exists(Self::HISTORY_TABLE)
111                .await?;
112
113            Ok(())
114        })
115    }
116
117    /// Drop the history table if requested.
118    fn drop_history_table(&mut self) -> BoxFuture<'_, TernResult<()>> {
119        Box::pin(async move {
120            let executor = self.executor();
121            executor.drop_history(Self::HISTORY_TABLE).await?;
122
123            Ok(())
124        })
125    }
126
127    /// Upsert applied migrations.
128    fn upsert_applied<'migration, 'conn: 'migration>(
129        &'conn mut self,
130        applied: &'migration AppliedMigration,
131    ) -> BoxFuture<'migration, TernResult<()>> {
132        Box::pin(async move {
133            self.executor()
134                .upsert_applied_migration(Self::HISTORY_TABLE, applied)
135                .await?;
136
137            Ok(())
138        })
139    }
140}
141
142/// The "executor" type for the database backend ultimately responsible for
143/// issuing migration and schema history queries.
144pub trait Executor
145where
146    Self: Send + Sync + 'static,
147{
148    /// The type of value that can produce queries for the history table of this
149    /// migration set.
150    type Queries: QueryRepository;
151
152    /// Apply the `Query` for the migration in a transaction.
153    fn apply_tx(&mut self, query: &Query) -> impl Future<Output = TernResult<()>> + Send;
154
155    /// Apply the `Query` for the migration _not_ in a transaction.
156    fn apply_no_tx(&mut self, query: &Query) -> impl Future<Output = TernResult<()>> + Send;
157
158    /// `CREATE IF NOT EXISTS` the history table.
159    fn create_history_if_not_exists(
160        &mut self,
161        history_table: &str,
162    ) -> impl Future<Output = TernResult<()>> + Send;
163
164    /// `DROP` the history table.
165    fn drop_history(&mut self, history_table: &str) -> impl Future<Output = TernResult<()>> + Send;
166
167    /// Get the complete history of applied migrations.
168    fn get_all_applied(
169        &mut self,
170        history_table: &str,
171    ) -> impl Future<Output = TernResult<Vec<AppliedMigration>>> + Send;
172
173    /// Insert an applied migration into the history table.
174    fn insert_applied_migration(
175        &mut self,
176        history_table: &str,
177        applied: &AppliedMigration,
178    ) -> impl Future<Output = TernResult<()>> + Send;
179
180    /// Update or insert an applied migration.
181    fn upsert_applied_migration(
182        &mut self,
183        history_table: &str,
184        applied: &AppliedMigration,
185    ) -> impl Future<Output = TernResult<()>> + Send;
186}
187
188/// A type that has a library of "administrative" queries that are needed during
189/// a migration run.
190pub trait QueryRepository {
191    /// The query that creates the schema history table or does nothing if it
192    /// already exists.
193    fn create_history_if_not_exists_query(history_table: &str) -> Query;
194
195    /// The query that drops the history table if requested.
196    fn drop_history_query(history_table: &str) -> Query;
197
198    /// The query to update the schema history table with an applied migration.
199    fn insert_into_history_query(history_table: &str, applied: &AppliedMigration) -> Query;
200
201    /// The query to return all rows from the schema history table.
202    fn select_star_from_history_query(history_table: &str) -> Query;
203
204    /// Query to insert or update a record in the history table.
205    fn upsert_history_query(history_table: &str, applied: &AppliedMigration) -> Query;
206}
207
208/// A single migration in a migration set.
209pub trait Migration
210where
211    Self: Send + Sync,
212{
213    /// A migration context that is sufficient to build this migration.
214    type Ctx: MigrationContext;
215
216    /// Get the `MigrationId` for this migration.
217    fn migration_id(&self) -> MigrationId;
218
219    /// The raw file content of the migration source file, or when stored as an
220    /// applied migration in the history table, it is the query that was ran.
221    fn content(&self) -> String;
222
223    /// Whether this migration should not be applied in a database transaction.
224    fn no_tx(&self) -> bool;
225
226    /// Produce a future resolving to the migration query when `await`ed.
227    fn build<'a>(&'a self, ctx: &'a mut Self::Ctx) -> BoxFuture<'a, TernResult<Query>>;
228
229    /// The migration version.
230    fn version(&self) -> i64 {
231        self.migration_id().version()
232    }
233
234    /// Convert this migration to an [`AppliedMigration`] assuming that it was
235    /// successfully applied.
236    fn to_applied(
237        &self,
238        duration_ms: i64,
239        applied_at: DateTime<Utc>,
240        content: &str,
241    ) -> AppliedMigration {
242        AppliedMigration::new(self.migration_id(), content, duration_ms, applied_at)
243    }
244}
245
246/// A type that is used to collect a [`MigrationSet`] -- migrations that are not
247/// applied yet -- which is used as the input to runner commands.
248pub trait MigrationSource {
249    /// A context that the set of migrations returned by `migration_set` would
250    /// need in order to be applied.
251    type Ctx: MigrationContext;
252
253    /// The set of migrations since the last apply.
254    fn migration_set(&self, latest_version: Option<i64>) -> MigrationSet<Self::Ctx>;
255}
256
257/// The `Migration`s derived from the files in the source directory that need to
258/// be applied.
259pub struct MigrationSet<Ctx: ?Sized> {
260    pub migrations: Vec<Box<dyn Migration<Ctx = Ctx>>>,
261}
262
263impl<Ctx> MigrationSet<Ctx>
264where
265    Ctx: MigrationContext,
266{
267    pub fn new<T>(vs: T) -> MigrationSet<Ctx>
268    where
269        T: Into<Vec<Box<dyn Migration<Ctx = Ctx>>>>,
270    {
271        let mut migrations = vs.into();
272        migrations.sort_by_key(|m| m.version());
273        MigrationSet { migrations }
274    }
275
276    /// Number of migrations in the set.
277    pub fn len(&self) -> usize {
278        self.migrations.len()
279    }
280
281    /// Versions present in this migration set.
282    pub fn versions(&self) -> Vec<i64> {
283        self.migrations
284            .iter()
285            .map(|m| m.version())
286            .collect::<Vec<_>>()
287    }
288
289    /// The version/name of migrations in this migration set.
290    pub fn migration_ids(&self) -> Vec<MigrationId> {
291        self.migrations
292            .iter()
293            .map(|m| m.migration_id())
294            .collect::<Vec<_>>()
295    }
296
297    /// The latest version in the set.
298    pub fn max(&self) -> Option<i64> {
299        self.versions().iter().max().copied()
300    }
301
302    /// The set is empty for the requested operation.
303    pub fn is_empty(&self) -> bool {
304        self.len() == 0
305    }
306}
307
308/// A helper trait for [`Migration`].
309///
310/// With the derive macros, the user's responsibility is to implement this for
311/// a Rust migration, and the proc macro uses it to build an implementation of
312/// [`Migration`].
313pub trait QueryBuilder {
314    /// The context for running the migration this query is for.
315    type Ctx: MigrationContext;
316
317    /// Asynchronously produce the migration query.
318    fn build(&self, ctx: &mut Self::Ctx) -> impl Future<Output = TernResult<Query>> + Send;
319}
320
321/// A SQL query.
322pub struct Query(String);
323
324impl Query {
325    pub fn new(sql: String) -> Self {
326        Self(sql)
327    }
328
329    pub fn sql(&self) -> &str {
330        &self.0
331    }
332
333    // TODO(quasi-coherent): Support different types of comment syntax.
334    /// Split a query comprised of multiple statements.
335    ///
336    /// For queries having `no_tx = true`, a migration comprised of multiple,
337    /// separate SQL statements needs to be broken up so that the statements can
338    /// run sequentially.  Otherwise, many backends will run the collection of
339    /// statements in a transaction automatically, which breaches the `no_tx`
340    /// contract.
341    ///
342    /// _Note_: This depends on comments in a .sql file being only of the `--`
343    /// flavor.  A future version will be smarter than that.
344    pub fn split_statements(&self) -> TernResult<Vec<String>> {
345        let mut statements = Vec::new();
346        self.sql()
347            .lines()
348            .try_fold(String::new(), |mut buf, line| {
349                let line = line.trim();
350                writeln!(buf, "{line}")?;
351                // If a line ends with `;` and is not a comment, this is the
352                // last line of the statement.  So push to `statements` and
353                // reset the buffer for parsing the next statement.
354                if line.ends_with(";") && !line.starts_with("--") {
355                    statements.push(buf);
356                    Ok::<String, std::fmt::Error>(String::new())
357                } else {
358                    Ok(buf)
359                }
360            })?;
361
362        Ok(statements)
363    }
364}
365
366/// Name/version derived from the migration source filename.
367#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
368pub struct MigrationId {
369    /// Version parsed from the migration filename.
370    version: i64,
371    /// Description parsed from the migration filename.
372    description: String,
373}
374
375impl MigrationId {
376    pub fn new(version: i64, description: String) -> Self {
377        Self {
378            version,
379            description,
380        }
381    }
382
383    pub fn version(&self) -> i64 {
384        self.version
385    }
386
387    pub fn description(&self) -> String {
388        self.description.clone()
389    }
390}
391
392impl std::fmt::Display for MigrationId {
393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394        write!(f, "V{}__{}", self.version, self.description)
395    }
396}
397
398impl From<AppliedMigration> for MigrationId {
399    fn from(value: AppliedMigration) -> Self {
400        Self {
401            version: value.version,
402            description: value.description,
403        }
404    }
405}
406
407/// An `AppliedMigration` is the information about a migration that completed
408/// successfully and it is also a row in the schema history table.
409#[derive(Debug, Clone)]
410#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
411pub struct AppliedMigration {
412    /// The migration version.
413    pub version: i64,
414    /// The description of the migration.
415    pub description: String,
416    /// The contents of the migration file at the time it was applied.
417    pub content: String,
418    /// How long the migration took to run in milliseconds.
419    pub duration_ms: i64,
420    /// The timestamp of when the migration was applied.
421    pub applied_at: DateTime<Utc>,
422}
423
424impl AppliedMigration {
425    pub fn new(
426        id: MigrationId,
427        content: &str,
428        duration_ms: i64,
429        applied_at: DateTime<Utc>,
430    ) -> Self {
431        Self {
432            version: id.version,
433            description: id.description,
434            content: content.into(),
435            duration_ms,
436            applied_at,
437        }
438    }
439}