db_up/lib.rs
1use std::error::Error;
2use std::fmt::{Display, Formatter};
3use std::sync::Arc;
4use async_trait::async_trait;
5
6pub use db_up_codegen::{ migrations };
7pub use db_up_sql_changelog::{Result as ChangelogResult, *};
8
9/// Kinds of errors produced by the migration code
10#[derive(Debug)]
11pub enum MigrationsErrorKind {
12 /// The migration failed during a single migration step.
13 ///
14 /// This will usually happen when the lines (steps) of a changelog file are processed.
15 MigrationDatabaseStepFailed(Option<Box<dyn Error + Send + Sync>>),
16
17 /// There was a general database-related problem
18 MigrationDatabaseFailed(Option<Box<dyn Error + Send + Sync>>),
19
20 /// Could not set up migration metadata
21 ///
22 /// This usually means that the migration state management could not be set up, e.g. because
23 /// there was an error while creating the migrations state table.
24 MigrationSetupFailed(Option<Box<dyn Error + Send + Sync>>),
25
26 /// There was a problem beginning/finishing a version
27 MigrationVersioningFailed(Option<Box<dyn Error + Send + Sync>>),
28
29 /// Some kind of error that has no specific representation
30 CustomErrorMessage(String, Option<Box<dyn Error + Send + Sync>>),
31}
32
33/// Represents errors produced by migration code
34#[derive(Debug)]
35pub struct MigrationsError {
36 /// The kind of error that occurred
37 kind: MigrationsErrorKind,
38
39 /// The last successfully deployed version
40 last_successful_version: Option<u32>,
41}
42
43impl MigrationsError {
44 pub fn migration_database_step_failed(last_successful_version: Option<u32>,
45 cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
46 return MigrationsError {
47 kind: MigrationsErrorKind::MigrationDatabaseStepFailed(cause),
48 last_successful_version
49 };
50 }
51
52 pub fn migration_database_failed(last_successful_version: Option<u32>,
53 cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
54 return MigrationsError {
55 kind: MigrationsErrorKind::MigrationDatabaseFailed(cause),
56 last_successful_version
57 };
58 }
59
60 pub fn migration_setup_failed(cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
61 return MigrationsError {
62 kind: MigrationsErrorKind::MigrationSetupFailed(cause),
63 last_successful_version: None,
64 };
65 }
66
67 pub fn migration_versioning_failed(cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
68 return MigrationsError {
69 kind: MigrationsErrorKind::MigrationVersioningFailed(cause),
70 last_successful_version: None,
71 };
72 }
73
74 pub fn custom_message(message: &str, last_successful_version: Option<u32>,
75 cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
76 return MigrationsError {
77 kind: MigrationsErrorKind::CustomErrorMessage(message.to_string(), cause),
78 last_successful_version,
79 };
80 }
81
82 pub fn kind(&self) -> &MigrationsErrorKind {
83 &self.kind
84 }
85
86 pub fn last_successful_version(&self) -> Option<u32> {
87 self.last_successful_version
88 }
89}
90
91pub type Result<T> = std::result::Result<T, MigrationsError>;
92
93impl Display for MigrationsError {
94 fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
95 match &self.kind {
96 MigrationsErrorKind::MigrationDatabaseStepFailed(err_opt) => {
97 let mut result = write!(fmt, "Migration step failed.");
98 if err_opt.is_some() {
99 result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
100 }
101 return result;
102 },
103 MigrationsErrorKind::MigrationDatabaseFailed(err_opt) => {
104 let mut result = write!(fmt, "Migration failed.");
105 if err_opt.is_some() {
106 result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
107 }
108 return result;
109 },
110 MigrationsErrorKind::MigrationSetupFailed(err_opt) => {
111 let mut result = write!(fmt, "Migration setup failed.");
112 if err_opt.is_some() {
113 result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
114 }
115 return result;
116 },
117 MigrationsErrorKind::MigrationVersioningFailed(err_opt) => {
118 let mut result = write!(fmt, "Migration versioning failed.");
119 if err_opt.is_some() {
120 result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
121 }
122 return result;
123 },
124 MigrationsErrorKind::CustomErrorMessage(message, err_opt) => {
125 let mut result = write!(fmt, "{}", message.as_str());
126 if err_opt.is_some() {
127 result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
128 }
129 return result;
130 }
131 };
132 }
133}
134
135impl Error for MigrationsError {
136 // fn source(&self) -> Option<&(dyn Error + 'static)> {
137 // match &self.kind {
138 // MigrationsErrorKind::MigrationDatabaseStepFailed(err_opt) => {
139 // if err_opt.is_some() {
140 // let err = err_opt.as_ref().unwrap();
141 // return Some(err);
142 // }
143 // },
144 // MigrationsErrorKind::MigrationDatabaseFailed(err_opt) => {
145 // if err_opt.is_some() {
146 // let err = err_opt.as_ref().unwrap();
147 // return Some(err);
148 // }
149 // },
150 // MigrationsErrorKind::MigrationSetupFailed(err_opt) => {
151 // if err_opt.is_some() {
152 // let err = err_opt.as_ref().unwrap();
153 // return Some(err);
154 // }
155 // },
156 // MigrationsErrorKind::MigrationVersioningFailed(err_opt) => {
157 // if err_opt.is_some() {
158 // let err = err_opt.as_ref().unwrap();
159 // return Some(err);
160 // }
161 // },
162 // // MigrationsErrorKind::MigrationQueryArgumentReplacementFailed(err_opt) => {
163 // // if err_opt.is_some() {
164 // // let err = err_opt.as_ref().unwrap();
165 // // return Some(err);
166 // // }
167 // // },
168 // };
169 // return None;
170 // }
171}
172
173/// Status of a migration.
174#[derive(Debug, Clone)]
175pub enum MigrationStatus {
176 /// Migration is in progress.
177 ///
178 /// The migration of this version has been started, but not finished yet. Depending on the
179 /// database driver and transaction management, this status may never actually land in the
180 /// database.
181 InProgress,
182
183 /// Migration has been finished.
184 Deployed,
185}
186
187/// The minimal information for a migration version
188#[derive(Debug, Clone)]
189pub struct MigrationState {
190 /// The version of the migration
191 pub version: u32,
192
193 /// The status of the migration
194 pub status: MigrationStatus,
195}
196
197/// Trait for state management
198///
199/// This should be implemented by DB drivers so that db-up can manage installed schema versions.
200#[async_trait]
201pub trait MigrationStateManager {
202 /// Prepare the DB for migration state management
203 ///
204 /// This will be called before any other methods to ensure that the dateabase is prepared
205 /// for state management. For most drivers, this method will simply ensure that a state
206 /// management table exists.
207 async fn prepare(&self) -> Result<()>;
208
209 /// Get the lowest deployed version
210 async fn lowest_version(&self) -> Result<Option<MigrationState>>;
211
212 /// Get the highest deployed version
213 async fn highest_version(&self) -> Result<Option<MigrationState>>;
214
215 /// Get a list of all deployed versions
216 async fn list_versions(&self) -> Result<Vec<MigrationState>>;
217
218 /// Begin a new version
219 async fn begin_version(&self, version: u32) -> Result<()>;
220
221 /// Finish a new version
222 ///
223 /// This will usually just set the status of the migration version to `Deployed`
224 async fn finish_version(&self, version: u32) -> Result<()>;
225}
226
227/// Trait for executing migrations
228///
229/// This should be implemented by DB drivers so that db-up can execute migrations on the
230/// database.
231#[async_trait]
232pub trait MigrationExecutor {
233 async fn begin_transaction(&self) -> Result<()>;
234 async fn execute_changelog_file(&self, changelog_file: ChangelogFile) -> Result<()>;
235 async fn commit_transaction(&self) -> Result<()>;
236 async fn rollback_transaction(&self) -> Result<()>;
237}
238
239/// Struct for running migrations on a database
240pub struct MigrationRunner<S, M, E> {
241 /// The migration store containing the changelog files
242 store: S,
243
244 /// The state manager
245 ///
246 /// This is an `Arc` so that the state manager and the executor can, but are not required
247 /// to be, the same object.
248 state_manager: Arc<M>,
249
250 /// The migration executor
251 ///
252 /// This is an `Arc` so that the state manager and the executor can, but are not required
253 /// to be, the same object.
254 executor: Arc<E>,
255}
256
257/// Struct storing the changelogs needed for the migrations
258///
259/// Implementations of this trait will usually be generated by the `migrations` macro, but can
260/// also be created manually.
261pub trait MigrationStore {
262 fn changelogs(&self) -> Vec<ChangelogFile>;
263}
264
265impl<S, M, E> MigrationRunner<S, M, E>
266 where S: MigrationStore,
267 M: MigrationStateManager,
268 E: MigrationExecutor {
269
270 /// Create a new `MigrationRunner`
271 pub fn new(store: S, state_manager: Arc<M>, executor: Arc<E>) -> Self {
272 return Self {
273 store, state_manager, executor
274 };
275 }
276
277 /// Migrate with a separate transaction for each changelog
278 ///
279 /// This will execute each migration inside its own DB transaction. Therefore, if an error
280 /// occurs and the method returns prematurely, all versions that have been successfully
281 /// deployed will stay in the database.
282 pub async fn migrate(&self) -> Result<Option<u32>> {
283 self.state_manager.prepare().await?;
284 let mut current_highest_version = self.state_manager.highest_version()
285 .await?
286 .map(|state| state.version);
287 let mut migrations: Vec<ChangelogFile> = self.store.changelogs().into_iter()
288 .filter(|migration| {
289 let version: u32 = migration.version()
290 .parse()
291 .expect("Version must be an integer");
292 return current_highest_version.map(|highest_version| version > highest_version)
293 .or(Some(true))
294 .unwrap();
295 })
296 .collect::<Vec<ChangelogFile>>();
297 println!("sorting migrations ...");
298 migrations.sort_by(|a, b| a.version().cmp(b.version()));
299 let migrations = migrations;
300
301 println!("running migrations ... {:?}", &migrations);
302 for changelog in migrations.into_iter() {
303 let version: u32 = changelog.version().parse().unwrap();
304
305 self.state_manager.begin_version(version).await?;
306 self.executor.begin_transaction().await?;
307 let result = self.executor
308 .execute_changelog_file(changelog)
309 .await;
310
311 match result {
312 Ok(_) => {
313 self.executor.commit_transaction().await?;
314 self.state_manager.finish_version(version).await?;
315 current_highest_version = Some(version);
316 },
317 Err(err) => {
318 let _result = self.executor.rollback_transaction().await
319 .or::<MigrationsError>(Ok(()))
320 .unwrap();
321 return Err(err);
322 }
323 }
324 }
325
326 return Ok(current_highest_version);
327 }
328
329 // /// Migrate with a single transaction for all changelogs
330 //
331 // /// This will execute all migrations inside one big DB transaction. Therefore, if an error
332 // /// occurs and the method returns prematurely, none of the changes will stay inside
333 // /// the database.
334 // pub async fn migrate_single_transaction(&self) -> Result<Option<u32>> {
335 // self.state_manager.prepare().await?;
336 // let mut current_highest_version = self.state_manager.highest_version()
337 // .await?
338 // .map(|state| state.version);
339 // let mut migrations: Vec<ChangelogFile> = self.store.changelogs().into_iter()
340 // .filter(|migration| {
341 // let version: u32 = migration.version()
342 // .parse()
343 // .expect("Version must be an integer");
344 // return current_highest_version.map(|highest_version| version > highest_version)
345 // .or(Some(true))
346 // .unwrap();
347 // })
348 // .collect::<Vec<ChangelogFile>>();
349 // migrations.sort_by(|a, b| a.version().cmp(b.version()));
350 // let migrations = migrations;
351 //
352 // self.executor.begin_transaction().await?;
353 // for changelog in migrations.into_iter() {
354 // let version: u32 = changelog.version().parse().unwrap();
355 //
356 // let result = self.executor
357 // .execute_changelog_file(changelog)
358 // .await;
359 // match result {
360 // Ok(_) => {
361 // current_highest_version = Some(version);
362 // },
363 // Err(err) => {
364 // self.executor.rollback_transaction();
365 // return Err(err);
366 // }
367 // }
368 // }
369 // self.executor.commit_transaction().await?;
370 //
371 // return Ok(current_highest_version);
372 // }
373}