strut_database/
migrations.rs1use sqlx::Error as SqlxError;
2use sqlx_core::database::Database;
3use sqlx_core::migrate::{Migrate, MigrateError, Migrator};
4use sqlx_core::pool::Pool;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use strut_core::{AppContext, AppSpindown, AppSpindownToken};
8use strut_sync::{Gate, Latch};
9use strut_util::Backoff;
10use tokio::select;
11use tracing::{error, info, warn};
12
13pub struct MigrationsWorker<DB>
16where
17 DB: Database,
18 <DB as Database>::Connection: Migrate,
19{
20 name: Arc<str>,
21 migrator: &'static Migrator,
22 pool: Pool<DB>,
23 backoff: Backoff,
24 latch: Latch,
25 _spindown_token: AppSpindownToken,
26}
27
28impl<DB> MigrationsWorker<DB>
29where
30 DB: Database,
31 <DB as Database>::Connection: Migrate,
32{
33 pub async fn start(name: impl AsRef<str>, migrator: &'static Migrator, pool: Pool<DB>) -> Gate {
41 let name = Self::compose_name(name);
42 let backoff = Backoff::default();
43 let latch = Latch::new();
44 let gate = latch.gate();
45 let _spindown_token = AppSpindown::register(&name);
46
47 let worker = Self {
48 name,
49 migrator,
50 pool,
51 backoff,
52 latch,
53 _spindown_token,
54 };
55
56 tokio::spawn(worker.apply());
57
58 gate
59 }
60
61 fn compose_name(name: impl AsRef<str>) -> Arc<str> {
63 static COUNTER: AtomicUsize = AtomicUsize::new(0);
64
65 Arc::from(format!(
66 "database:migrations:{}:{}",
67 name.as_ref(),
68 COUNTER.fetch_add(1, Ordering::Relaxed),
69 ))
70 }
71
72 async fn apply(self) {
75 loop {
76 let state = select! {
77 biased;
78 _ = AppContext::terminated() => ServingState::Terminated,
79 result = self.migrator.run(&self.pool) => self.interpret_result(result).await,
80 };
81
82 if matches!(state, ServingState::Terminated) {
83 break;
84 }
85 }
86
87 }
89
90 async fn interpret_result(&self, result: Result<(), MigrateError>) -> ServingState {
92 match result {
93 Ok(_) => {
95 info!(
96 name = self.name.as_ref(),
97 "Successfully applied the database migrations",
98 );
99
100 self.latch.release();
102
103 ServingState::Terminated
105 }
106
107 Err(error) => {
109 self.report_error(&error);
111
112 select! {
114 biased;
115 _ = AppContext::terminated() => ServingState::Terminated,
116 _ = self.backoff.sleep_next() => ServingState::Ongoing,
117 }
118 }
119 }
120 }
121
122 fn report_error(&self, error: &MigrateError) {
124 if error.is_significant() {
125 error!(
126 alert = true,
127 name = self.name.as_ref(),
128 ?error,
129 error_message = %error,
130 "Failed to apply the database migrations",
131 );
132 } else {
133 warn!(
134 name = self.name.as_ref(),
135 ?error,
136 error_message = %error,
137 "Temporarily unable to apply the database migrations",
138 );
139 }
140 }
141}
142
143enum ServingState {
145 Ongoing,
146 Terminated,
147}
148
149trait IsSignificant {
152 fn is_significant(&self) -> bool;
153}
154
155impl IsSignificant for MigrateError {
156 fn is_significant(&self) -> bool {
157 match self {
158 MigrateError::Execute(error) => error.is_significant(),
159 _ => true,
160 }
161 }
162}
163
164impl IsSignificant for SqlxError {
165 fn is_significant(&self) -> bool {
166 match self {
167 SqlxError::PoolTimedOut => false, _ => true,
169 }
170 }
171}