strut_database/
migrations.rs

1use sqlx::Error as SqlxError;
2use sqlx_core::database::Database;
3use sqlx_core::migrate::{Migrate, MigrateError, Migrator};
4use sqlx_core::pool::Pool;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use strut_core::{AppContext, AppSpindown, AppSpindownToken};
8use strut_sync::{Gate, Latch};
9use strut_util::Backoff;
10use tokio::select;
11use tracing::{error, info, warn};
12
13/// An asynchronous worker that applies database migrations in the background
14/// and signals completion to whoever cares to listen via the returned [`Gate`].
15pub struct MigrationsWorker<DB>
16where
17    DB: Database,
18    <DB as Database>::Connection: Migrate,
19{
20    name: Arc<str>,
21    migrator: &'static Migrator,
22    pool: Pool<DB>,
23    backoff: Backoff,
24    latch: Latch,
25    _spindown_token: AppSpindownToken,
26}
27
28impl<DB> MigrationsWorker<DB>
29where
30    DB: Database,
31    <DB as Database>::Connection: Migrate,
32{
33    /// Starts applying migrations using the given static reference to a
34    /// [`migrator`](Migrator) and the given [`pool`](Pool) of database
35    /// connections. The given `name` is attached to this [`MigrationsWorker`]
36    /// for logging/alerting purposes.
37    ///
38    /// The returned [`Gate`] can be used to await for the migrations to be
39    /// successfully applied.
40    pub async fn start(name: impl AsRef<str>, migrator: &'static Migrator, pool: Pool<DB>) -> Gate {
41        let name = Self::compose_name(name);
42        let backoff = Backoff::default();
43        let latch = Latch::new();
44        let gate = latch.gate();
45        let _spindown_token = AppSpindown::register(&name);
46
47        let worker = Self {
48            name,
49            migrator,
50            pool,
51            backoff,
52            latch,
53            _spindown_token,
54        };
55
56        tokio::spawn(worker.apply());
57
58        gate
59    }
60
61    /// Composes a human-readable name for this worker.
62    fn compose_name(name: impl AsRef<str>) -> Arc<str> {
63        static COUNTER: AtomicUsize = AtomicUsize::new(0);
64
65        Arc::from(format!(
66            "database:migrations:{}:{}",
67            name.as_ref(),
68            COUNTER.fetch_add(1, Ordering::Relaxed),
69        ))
70    }
71
72    /// Repeatedly attempts to apply the migrations until it either succeeds or
73    /// the global [`AppContext`] is terminated.
74    async fn apply(self) {
75        loop {
76            let state = select! {
77                biased;
78                _ = AppContext::terminated() => ServingState::Terminated,
79                result = self.migrator.run(&self.pool) => self.interpret_result(result).await,
80            };
81
82            if matches!(state, ServingState::Terminated) {
83                break;
84            }
85        }
86
87        // Nothing to clean up, we just have to allow the self._spindown_token to be dropped
88    }
89
90    /// Interprets the result of a single attempt to apply migrations.
91    async fn interpret_result(&self, result: Result<(), MigrateError>) -> ServingState {
92        match result {
93            // Migrations successfully applied
94            Ok(_) => {
95                info!(
96                    name = self.name.as_ref(),
97                    "Successfully applied the database migrations",
98                );
99
100                // This is important, as it will unblock any dependent resources
101                self.latch.release();
102
103                // No further attempts are necessary
104                ServingState::Terminated
105            }
106
107            // Something went wrong while applying migrations
108            Err(error) => {
109                // Report the situation
110                self.report_error(&error);
111
112                // Wait a bit before re-trying
113                select! {
114                    biased;
115                    _ = AppContext::terminated() => ServingState::Terminated,
116                    _ = self.backoff.sleep_next() => ServingState::Ongoing,
117                }
118            }
119        }
120    }
121
122    /// Reports the given [`MigrateError`] depending on its perceived severity.
123    fn report_error(&self, error: &MigrateError) {
124        if error.is_significant() {
125            error!(
126                alert = true,
127                name = self.name.as_ref(),
128                ?error,
129                error_message = %error,
130                "Failed to apply the database migrations",
131            );
132        } else {
133            warn!(
134                name = self.name.as_ref(),
135                ?error,
136                error_message = %error,
137                "Temporarily unable to apply the database migrations",
138            );
139        }
140    }
141}
142
143/// Internal marker that indicates the state of this worker.
144enum ServingState {
145    Ongoing,
146    Terminated,
147}
148
149/// Internal convenience trait for determining whether an `sqlx` error should be
150/// perceived as significant for the logging/alerting purposes.
151trait IsSignificant {
152    fn is_significant(&self) -> bool;
153}
154
155impl IsSignificant for MigrateError {
156    fn is_significant(&self) -> bool {
157        match self {
158            MigrateError::Execute(error) => error.is_significant(),
159            _ => true,
160        }
161    }
162}
163
164impl IsSignificant for SqlxError {
165    fn is_significant(&self) -> bool {
166        match self {
167            SqlxError::PoolTimedOut => false, // just a temporary connectivity issue
168            _ => true,
169        }
170    }
171}