db_up/
lib.rs

1use std::error::Error;
2use std::fmt::{Display, Formatter};
3use std::sync::Arc;
4use async_trait::async_trait;
5
6pub use db_up_codegen::{ migrations };
7pub use db_up_sql_changelog::{Result as ChangelogResult, *};
8
9/// Kinds of errors produced by the migration code
10#[derive(Debug)]
11pub enum MigrationsErrorKind {
12    /// The migration failed during a single migration step.
13    ///
14    /// This will usually happen when the lines (steps) of a changelog file are processed.
15    MigrationDatabaseStepFailed(Option<Box<dyn Error + Send + Sync>>),
16
17    /// There was a general database-related problem
18    MigrationDatabaseFailed(Option<Box<dyn Error + Send + Sync>>),
19
20    /// Could not set up migration metadata
21    ///
22    /// This usually means that the migration state management could not be set up, e.g. because
23    /// there was an error while creating the migrations state table.
24    MigrationSetupFailed(Option<Box<dyn Error + Send + Sync>>),
25
26    /// There was a problem beginning/finishing a version
27    MigrationVersioningFailed(Option<Box<dyn Error + Send + Sync>>),
28
29    /// Some kind of error that has no specific representation
30    CustomErrorMessage(String, Option<Box<dyn Error + Send + Sync>>),
31}
32
33/// Represents errors produced by migration code
34#[derive(Debug)]
35pub struct MigrationsError {
36    /// The kind of error that occurred
37    kind: MigrationsErrorKind,
38
39    /// The last successfully deployed version
40    last_successful_version: Option<u32>,
41}
42
43impl MigrationsError {
44    pub fn migration_database_step_failed(last_successful_version: Option<u32>,
45                                          cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
46        return MigrationsError {
47            kind: MigrationsErrorKind::MigrationDatabaseStepFailed(cause),
48            last_successful_version
49        };
50    }
51
52    pub fn migration_database_failed(last_successful_version: Option<u32>,
53                                     cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
54        return MigrationsError {
55            kind: MigrationsErrorKind::MigrationDatabaseFailed(cause),
56            last_successful_version
57        };
58    }
59
60    pub fn migration_setup_failed(cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
61        return MigrationsError {
62            kind: MigrationsErrorKind::MigrationSetupFailed(cause),
63            last_successful_version: None,
64        };
65    }
66
67    pub fn migration_versioning_failed(cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
68        return MigrationsError {
69            kind: MigrationsErrorKind::MigrationVersioningFailed(cause),
70            last_successful_version: None,
71        };
72    }
73
74    pub fn custom_message(message: &str, last_successful_version: Option<u32>,
75                          cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
76        return MigrationsError {
77            kind: MigrationsErrorKind::CustomErrorMessage(message.to_string(), cause),
78            last_successful_version,
79        };
80    }
81
82    pub fn kind(&self) -> &MigrationsErrorKind {
83        &self.kind
84    }
85
86    pub fn last_successful_version(&self) -> Option<u32> {
87        self.last_successful_version
88    }
89}
90
91pub type Result<T> = std::result::Result<T, MigrationsError>;
92
93impl Display for MigrationsError {
94    fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
95        match &self.kind {
96            MigrationsErrorKind::MigrationDatabaseStepFailed(err_opt) => {
97                let mut result = write!(fmt, "Migration step failed.");
98                if err_opt.is_some() {
99                    result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
100                }
101                return result;
102            },
103            MigrationsErrorKind::MigrationDatabaseFailed(err_opt) => {
104                let mut result = write!(fmt, "Migration failed.");
105                if err_opt.is_some() {
106                    result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
107                }
108                return result;
109            },
110            MigrationsErrorKind::MigrationSetupFailed(err_opt) => {
111                let mut result = write!(fmt, "Migration setup failed.");
112                if err_opt.is_some() {
113                    result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
114                }
115                return result;
116            },
117            MigrationsErrorKind::MigrationVersioningFailed(err_opt) => {
118                let mut result = write!(fmt, "Migration versioning failed.");
119                if err_opt.is_some() {
120                    result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
121                }
122                return result;
123            },
124            MigrationsErrorKind::CustomErrorMessage(message, err_opt) => {
125                let mut result = write!(fmt, "{}", message.as_str());
126                if err_opt.is_some() {
127                    result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
128                }
129                return result;
130            }
131        };
132    }
133}
134
135impl Error for MigrationsError {
136    // fn source(&self) -> Option<&(dyn Error + 'static)> {
137    //     match &self.kind {
138    //         MigrationsErrorKind::MigrationDatabaseStepFailed(err_opt) => {
139    //             if err_opt.is_some() {
140    //                 let err = err_opt.as_ref().unwrap();
141    //                 return Some(err);
142    //             }
143    //         },
144    //         MigrationsErrorKind::MigrationDatabaseFailed(err_opt) => {
145    //             if err_opt.is_some() {
146    //                 let err = err_opt.as_ref().unwrap();
147    //                 return Some(err);
148    //             }
149    //         },
150    //         MigrationsErrorKind::MigrationSetupFailed(err_opt) => {
151    //             if err_opt.is_some() {
152    //                 let err = err_opt.as_ref().unwrap();
153    //                 return Some(err);
154    //             }
155    //         },
156    //         MigrationsErrorKind::MigrationVersioningFailed(err_opt) => {
157    //             if err_opt.is_some() {
158    //                 let err = err_opt.as_ref().unwrap();
159    //                 return Some(err);
160    //             }
161    //         },
162    //         // MigrationsErrorKind::MigrationQueryArgumentReplacementFailed(err_opt) => {
163    //         //     if err_opt.is_some() {
164    //         //         let err = err_opt.as_ref().unwrap();
165    //         //         return Some(err);
166    //         //     }
167    //         // },
168    //     };
169    //     return None;
170    // }
171}
172
173/// Status of a migration.
174#[derive(Debug, Clone)]
175pub enum MigrationStatus {
176    /// Migration is in progress.
177    ///
178    /// The migration of this version has been started, but not finished yet. Depending on the
179    /// database driver and transaction management, this status may never actually land in the
180    /// database.
181    InProgress,
182
183    /// Migration has been finished.
184    Deployed,
185}
186
187/// The minimal information for a migration version
188#[derive(Debug, Clone)]
189pub struct MigrationState {
190    /// The version of the migration
191    pub version: u32,
192
193    /// The status of the migration
194    pub status: MigrationStatus,
195}
196
197/// Trait for state management
198///
199/// This should be implemented by DB drivers so that db-up can manage installed schema versions.
200#[async_trait]
201pub trait MigrationStateManager {
202    /// Prepare the DB for migration state management
203    ///
204    /// This will be called before any other methods to ensure that the dateabase is prepared
205    /// for state management. For most drivers, this method will simply ensure that a state
206    /// management table exists.
207    async fn prepare(&self) -> Result<()>;
208
209    /// Get the lowest deployed version
210    async fn lowest_version(&self) -> Result<Option<MigrationState>>;
211
212    /// Get the highest deployed version
213    async fn highest_version(&self) -> Result<Option<MigrationState>>;
214
215    /// Get a list of all deployed versions
216    async fn list_versions(&self) -> Result<Vec<MigrationState>>;
217
218    /// Begin a new version
219    async fn begin_version(&self, version: u32) -> Result<()>;
220
221    /// Finish a new version
222    ///
223    /// This will usually just set the status of the migration version to `Deployed`
224    async fn finish_version(&self, version: u32) -> Result<()>;
225}
226
227/// Trait for executing migrations
228///
229/// This should be implemented by DB drivers so that db-up can execute migrations on the
230/// database.
231#[async_trait]
232pub trait MigrationExecutor {
233    async fn begin_transaction(&self) -> Result<()>;
234    async fn execute_changelog_file(&self, changelog_file: ChangelogFile) -> Result<()>;
235    async fn commit_transaction(&self) -> Result<()>;
236    async fn rollback_transaction(&self) -> Result<()>;
237}
238
239/// Struct for running migrations on a database
240pub struct MigrationRunner<S, M, E> {
241    /// The migration store containing the changelog files
242    store: S,
243
244    /// The state manager
245    ///
246    /// This is an `Arc` so that the state manager and the executor can, but are not required
247    /// to be, the same object.
248    state_manager: Arc<M>,
249
250    /// The migration executor
251    ///
252    /// This is an `Arc` so that the state manager and the executor can, but are not required
253    /// to be, the same object.
254    executor: Arc<E>,
255}
256
257/// Struct storing the changelogs needed for the migrations
258///
259/// Implementations of this trait will usually be generated by the `migrations` macro, but can
260/// also be created manually.
261pub trait MigrationStore {
262    fn changelogs(&self) -> Vec<ChangelogFile>;
263}
264
265impl<S, M, E> MigrationRunner<S, M, E>
266    where S: MigrationStore,
267          M: MigrationStateManager,
268          E: MigrationExecutor {
269
270    /// Create a new `MigrationRunner`
271    pub fn new(store: S, state_manager: Arc<M>, executor: Arc<E>) -> Self {
272        return Self {
273            store, state_manager, executor
274        };
275    }
276
277    /// Migrate with a separate transaction for each changelog
278    ///
279    /// This will execute each migration inside its own DB transaction. Therefore, if an error
280    /// occurs and the method returns prematurely, all versions that have been successfully
281    /// deployed will stay in the database.
282    pub async fn migrate(&self) -> Result<Option<u32>> {
283        self.state_manager.prepare().await?;
284        let mut current_highest_version = self.state_manager.highest_version()
285            .await?
286            .map(|state| state.version);
287        let mut migrations: Vec<ChangelogFile> = self.store.changelogs().into_iter()
288            .filter(|migration| {
289                let version: u32 = migration.version()
290                    .parse()
291                    .expect("Version must be an integer");
292                return current_highest_version.map(|highest_version| version > highest_version)
293                    .or(Some(true))
294                    .unwrap();
295            })
296            .collect::<Vec<ChangelogFile>>();
297        println!("sorting migrations ...");
298        migrations.sort_by(|a, b| a.version().cmp(b.version()));
299        let migrations = migrations;
300
301        println!("running migrations ... {:?}", &migrations);
302        for changelog in migrations.into_iter() {
303            let version: u32 = changelog.version().parse().unwrap();
304
305            self.state_manager.begin_version(version).await?;
306            self.executor.begin_transaction().await?;
307            let result = self.executor
308                .execute_changelog_file(changelog)
309                .await;
310
311            match result {
312                Ok(_) => {
313                    self.executor.commit_transaction().await?;
314                    self.state_manager.finish_version(version).await?;
315                    current_highest_version = Some(version);
316                },
317                Err(err) => {
318                    let _result = self.executor.rollback_transaction().await
319                        .or::<MigrationsError>(Ok(()))
320                        .unwrap();
321                    return Err(err);
322                }
323            }
324        }
325
326        return Ok(current_highest_version);
327    }
328
329    // /// Migrate with a single transaction for all changelogs
330    //
331    // /// This will execute all migrations inside one big DB transaction. Therefore, if an error
332    // /// occurs and the method returns prematurely, none of the changes will stay inside
333    // /// the database.
334    // pub async fn migrate_single_transaction(&self) -> Result<Option<u32>> {
335    //     self.state_manager.prepare().await?;
336    //     let mut current_highest_version = self.state_manager.highest_version()
337    //         .await?
338    //         .map(|state| state.version);
339    //     let mut migrations: Vec<ChangelogFile> = self.store.changelogs().into_iter()
340    //         .filter(|migration| {
341    //             let version: u32 = migration.version()
342    //                 .parse()
343    //                 .expect("Version must be an integer");
344    //             return current_highest_version.map(|highest_version| version > highest_version)
345    //                 .or(Some(true))
346    //                 .unwrap();
347    //         })
348    //         .collect::<Vec<ChangelogFile>>();
349    //     migrations.sort_by(|a, b| a.version().cmp(b.version()));
350    //     let migrations = migrations;
351    //
352    //     self.executor.begin_transaction().await?;
353    //     for changelog in migrations.into_iter() {
354    //         let version: u32 = changelog.version().parse().unwrap();
355    //
356    //         let result = self.executor
357    //             .execute_changelog_file(changelog)
358    //             .await;
359    //         match result {
360    //             Ok(_) => {
361    //                 current_highest_version = Some(version);
362    //             },
363    //             Err(err) => {
364    //                 self.executor.rollback_transaction();
365    //                 return Err(err);
366    //             }
367    //         }
368    //     }
369    //     self.executor.commit_transaction().await?;
370    //
371    //     return Ok(current_highest_version);
372    // }
373}