mongodb_migrator/migrator/
with_migrations_vec.rs

1use std::collections::BTreeMap;
2
3use bson::{Bson, Document};
4use futures::StreamExt;
5use mongodb::{options::UpdateOptions, results::InsertOneResult};
6
7use super::{
8    shell::Shell, with_connection::WithConnection, with_shell_config::WithShellConfig, Env,
9};
10use crate::{
11    error::MigrationExecution, migration::Migration, migration_record::MigrationRecord,
12    migration_status::MigrationStatus,
13};
14
15pub struct WithMigrationsVec {
16    pub with_shell_config: Option<WithShellConfig>,
17    pub with_connection: WithConnection,
18    pub migrations: Vec<Box<dyn Migration>>,
19}
20
21impl WithMigrationsVec {
22    fn get_not_executed_migrations_ids(&self, first_failed_migration_index: usize) -> Vec<String> {
23        if self.migrations.len() - 1 == first_failed_migration_index {
24            vec![]
25        } else {
26            self.migrations[first_failed_migration_index + 1..]
27                .iter()
28                .map(|m| m.get_id().to_string())
29                .collect::<Vec<_>>()
30        }
31    }
32
33    async fn get_migrations_ids_to_execute_from_index(&self, lookup_from: usize) -> Vec<String> {
34        if self.migrations.len() - 1 == lookup_from {
35            vec![]
36        } else {
37            let ids = self.migrations[lookup_from..]
38                .into_iter()
39                .map(|migration| migration.get_id().to_string())
40                .collect::<Vec<String>>();
41
42            let mut failed = self.with_connection
43                .db
44                .clone()
45                .collection("migrations")
46                .find(
47                    bson::doc! {"_id": {"$in": ids.clone()}, "status": format!("{:?}", MigrationStatus::Fail)},
48                    None,
49                )
50		.await.unwrap().collect::<Vec<_>>().await
51		.into_iter()
52		// TODO(koc_kakoc): replace unwrap?
53		.map(|v| bson::from_bson(Bson::Document(v.unwrap())).unwrap())
54		.map(|v: MigrationRecord| v._id.to_string())
55		.collect::<Vec<String>>();
56
57            // TODO(koc_kakoc): use Set
58            let all = self
59                .with_connection
60                .db
61                .clone()
62                .collection("migrations")
63                .find(bson::doc! {}, None)
64                .await
65                .unwrap()
66                .collect::<Vec<_>>()
67                .await
68                .into_iter()
69                // TODO(koc_kakoc): replace unwrap?
70                .map(|v| bson::from_bson(Bson::Document(v.unwrap())).unwrap())
71                .map(|v: MigrationRecord| v._id.to_string())
72                .collect::<Vec<String>>();
73
74            failed.extend(ids.into_iter().filter(|id| !all.contains(&id)));
75            failed
76        }
77    }
78
79    /// This function executes all passed migrations in the passed order
80    /// for migration in migrations
81    ///   createInProgressBson
82    ///   handleIfFailed
83    ///   saveInMongoAsInProgress
84    ///   handleIfResultWasntSaved
85    ///   up
86    ///   createFinishedBson
87    ///   handleIfFailed
88    ///   saveInMongoAsFinished
89    ///   handleIfResultWasntSaved
90    ///   returnIfMigrationUpWithFailedResultWithAllNextSavedAsFail
91    pub async fn up(&self) -> Result<(), MigrationExecution> {
92        self.validate()?;
93
94        // TODO(koc_kakoc): execute only failed or not stored in migrations collections
95        let ids = self.get_migrations_ids_to_execute_from_index(0).await;
96        tracing::info!(
97            message = "the following migrations are going to be executed",
98            ids = format!("{:?}", ids),
99            op = format!("{:?}", OperationType::Up)
100        );
101        for (i, migration) in self
102            .migrations
103            .iter()
104            .filter(|m| ids.contains(&m.get_id().to_string()))
105            .enumerate()
106        {
107            let r = self
108                .try_run_migration(migration, i, OperationType::Up)
109                .await;
110            self.trace_result(&migration, &r, OperationType::Up);
111
112            r?
113        }
114
115        Ok(())
116    }
117
118    pub async fn down(&self) -> Result<(), MigrationExecution> {
119        self.validate()?;
120
121        let ids = self.get_migrations_ids_to_execute_from_index(0).await;
122        tracing::info!(
123            message = "the following migrations are going to be executed",
124            ids = format!("{:?}", ids),
125            op = format!("{:?}", OperationType::Down)
126        );
127        for (i, migration) in self
128            .migrations
129            .iter()
130            .rev()
131            .filter(|m| ids.contains(&m.get_id().to_string()))
132            .enumerate()
133        {
134            let r = self
135                .try_run_migration(migration, i, OperationType::Down)
136                .await;
137            self.trace_result(&migration, &r, OperationType::Down);
138
139            r?
140        }
141
142        Ok(())
143    }
144
145    async fn save_not_executed_migrations(
146        &self,
147        save_from_index: usize,
148    ) -> Result<(), MigrationExecution> {
149        if self.migrations.len() - 1 == save_from_index {
150            return Ok(());
151        }
152
153        for (i, migration) in self.migrations[save_from_index..].iter().enumerate() {
154            let migration_record = MigrationRecord::migration_start(migration.get_id().to_string());
155            let migration_record = MigrationRecord::migration_failed(migration_record);
156            let serialized_to_document_migration_record = bson::to_document(&migration_record)
157                .map_err(|error| MigrationExecution::InitialMigrationRecord {
158                    migration_id: migration.get_id().to_string(),
159                    migration_record: migration_record.clone(),
160                    next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
161                    additional_info: error,
162                })?;
163
164            let mut u_o: UpdateOptions = Default::default();
165            u_o.upsert = Some(true);
166
167            self.with_connection
168                .db
169                .clone()
170                .collection::<MigrationRecord>("migrations")
171                .update_one(
172                    bson::doc! {"_id": &migration_record._id},
173                    bson::doc! {"$set": serialized_to_document_migration_record},
174                    u_o,
175                )
176                .await
177                .map_err(
178                    |error| MigrationExecution::FinishedButNotSavedDueMongoError {
179                        migration_id: migration.get_id().to_string(),
180                        migration_status: format!("{:?}", &migration_record.status),
181                        additional_info: error,
182                        next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
183                    },
184                )?;
185        }
186
187        Ok(())
188    }
189
190    /// Tries to up a migration from the passed before vec
191    pub async fn up_single_from_vec(&self, migration_id: String) -> Result<(), MigrationExecution> {
192        self.validate()?;
193
194        let migration = self
195            .migrations
196            .iter()
197            .enumerate()
198            .find(|(_index, migration)| migration.get_id().to_string() == migration_id);
199
200        if migration.is_some() {
201            let (index, migration) = migration.unwrap();
202            let r = self
203                .try_run_migration(migration, index, OperationType::Up)
204                .await;
205            self.trace_result(&migration, &r, OperationType::Up);
206
207            r
208        } else {
209            Err(MigrationExecution::MigrationFromVecNotFound { migration_id })
210        }
211    }
212
213    /// Tries do rollback a migration from the bassed before vec
214    pub async fn down_single_from_vec(
215        &self,
216        migration_id: String,
217    ) -> Result<(), MigrationExecution> {
218        self.validate()?;
219
220        let migration = self
221            .migrations
222            .iter()
223            .enumerate()
224            .find(|(_index, migration)| migration.get_id().to_string() == migration_id);
225
226        if migration.is_some() {
227            let (index, migration) = migration.unwrap();
228            let r = self
229                .try_run_migration(migration, index, OperationType::Down)
230                .await;
231            self.trace_result(&migration, &r, OperationType::Down);
232
233            r
234        } else {
235            Err(MigrationExecution::MigrationFromVecNotFound { migration_id })
236        }
237    }
238
239    fn validate(&self) -> Result<(), MigrationExecution> {
240        let mut entries = BTreeMap::new();
241        self.migrations
242            .iter()
243            .enumerate()
244            .for_each(|(index, migration)| {
245                let entry = entries
246                    .entry(migration.get_id().to_string())
247                    .or_insert(vec![]);
248                entry.push(index);
249            });
250
251        let duplicates = entries
252            .into_iter()
253            .filter(|(_id, indices)| indices.len() > 1)
254            .collect::<BTreeMap<String, Vec<usize>>>();
255
256        if duplicates.len() > 0 {
257            Err(MigrationExecution::PassedMigrationsWithDuplicatedIds { duplicates })
258        } else {
259            Ok(())
260        }
261    }
262
263    fn prepare_initial_migration_record(
264        &self,
265        migration: &Box<dyn Migration>,
266        i: usize,
267    ) -> Result<(Document, MigrationRecord), MigrationExecution> {
268        let migration_record = MigrationRecord::migration_start(migration.get_id().to_string());
269
270        Ok((
271            bson::to_document(&migration_record).map_err(|error| {
272                MigrationExecution::InitialMigrationRecord {
273                    migration_id: migration.get_id().to_string(),
274                    migration_record: migration_record.clone(),
275                    next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
276                    additional_info: error,
277                }
278            })?,
279            migration_record,
280        ))
281    }
282
283    async fn save_initial_migration_record(
284        &self,
285        migration: &Box<dyn Migration>,
286        serialized_to_document_migration_record: Document,
287        i: usize,
288    ) -> Result<InsertOneResult, MigrationExecution> {
289        Ok(self
290            .with_connection
291            .db
292            .clone()
293            .collection("migrations")
294            .insert_one(serialized_to_document_migration_record, None)
295            .await
296            .map_err(|error| MigrationExecution::InProgressStatusNotSaved {
297                migration_id: migration.get_id().to_string(),
298                additional_info: error,
299                next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
300            })?)
301    }
302
303    async fn save_executed_migration_record(
304        &self,
305        migration: &Box<dyn Migration>,
306        migration_record: &MigrationRecord,
307        serialized_to_document_migration_record: Document,
308        res: InsertOneResult,
309        i: usize,
310    ) -> Result<(), MigrationExecution> {
311        let mut u_o: UpdateOptions = Default::default();
312        u_o.upsert = Some(true);
313
314        self.with_connection
315            .db
316            .clone()
317            .collection::<MigrationRecord>("migrations")
318            .update_one(
319                bson::doc! {"_id": res.inserted_id},
320                bson::doc! {"$set": serialized_to_document_migration_record},
321                u_o,
322            )
323            .await
324            .map_err(
325                |error| MigrationExecution::FinishedButNotSavedDueMongoError {
326                    migration_id: migration.get_id().to_string(),
327                    migration_status: format!("{:?}", &migration_record.status),
328                    additional_info: error,
329                    next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
330                },
331            )?;
332
333        Ok(())
334    }
335
336    fn try_get_mongo_shell(&self) -> Option<Shell> {
337        if self.with_shell_config.is_some() {
338            Some(Shell {
339                config: self
340                    .with_shell_config
341                    .clone()
342                    .expect("shell config is present")
343                    .with_shell_config,
344            })
345        } else {
346            None
347        }
348    }
349
350    async fn up_migration(
351        &self,
352        migration: &Box<dyn Migration>,
353        shell: Option<Shell>,
354        migration_record: &MigrationRecord,
355    ) -> MigrationRecord {
356        migration
357            .clone()
358            .up(Env {
359                db: Some(self.with_connection.db.clone()),
360                shell,
361                ..Default::default()
362            })
363            .await
364            .map_or_else(
365                |_| migration_record.clone().migration_failed(),
366                |_| migration_record.clone().migration_succeeded(),
367            )
368    }
369
370    async fn down_migration(
371        &self,
372        migration: &Box<dyn Migration>,
373        shell: Option<Shell>,
374        migration_record: &MigrationRecord,
375    ) -> MigrationRecord {
376        migration
377            .clone()
378            .down(Env {
379                db: Some(self.with_connection.db.clone()),
380                shell,
381                ..Default::default()
382            })
383            .await
384            .map_or_else(
385                |_| migration_record.clone().migration_failed(),
386                |_| migration_record.clone().migration_succeeded(),
387            )
388    }
389
390    async fn try_run_migration(
391        &self,
392        migration: &Box<dyn Migration>,
393        i: usize,
394        operation_type: OperationType,
395    ) -> Result<(), MigrationExecution> {
396        tracing::info!(
397            id = migration.get_id(),
398            op = format!("{:?}", operation_type),
399            status = format!("{:?}", MigrationStatus::InProgress)
400        );
401
402        let (serialized_to_document_migration_record, migration_record) =
403            self.prepare_initial_migration_record(migration, i)?;
404
405        let res = self
406            .save_initial_migration_record(migration, serialized_to_document_migration_record, i)
407            .await?;
408
409        let shell = self.try_get_mongo_shell();
410
411        let migration_record = match operation_type {
412            OperationType::Up => self.up_migration(migration, shell, &migration_record).await,
413            OperationType::Down => {
414                self.down_migration(migration, shell, &migration_record)
415                    .await
416            }
417        };
418
419        let serialized_to_document_migration_record = bson::to_document(&migration_record)
420            .map_err(
421                |error| MigrationExecution::FinishedButNotSavedDueToSerialization {
422                    migration_id: migration.get_id().to_string(),
423                    migration_status: format!("{:?}", &migration_record.status),
424                    migration_record: migration_record.clone(),
425                    next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
426                    additional_info: error,
427                },
428            )?;
429
430        self.save_executed_migration_record(
431            migration,
432            &migration_record,
433            serialized_to_document_migration_record,
434            res,
435            i,
436        )
437        .await?;
438
439        if migration_record.status == MigrationStatus::Fail {
440            self.save_not_executed_migrations(i + 1).await?;
441            return Err(MigrationExecution::FinishedAndSavedAsFail {
442                migration_id: migration.get_id().to_string(),
443                next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
444            });
445        }
446
447        Ok(())
448    }
449
450    fn trace_result(
451        &self,
452        migration: &Box<dyn Migration>,
453        migration_result: &Result<(), MigrationExecution>,
454        operation_type: OperationType,
455    ) {
456        tracing::info!(
457            id = migration.get_id(),
458            op = format!("{:?}", operation_type),
459            status = format!(
460                "{:?}",
461                (if migration_result.is_ok() {
462                    MigrationStatus::Success
463                } else {
464                    MigrationStatus::Fail
465                })
466            )
467        );
468    }
469}
470
471#[derive(Debug)]
472enum OperationType {
473    Up,
474    Down,
475}