1use std::collections::BTreeMap;
2
3use bson::{Bson, Document};
4use futures::StreamExt;
5use mongodb::{options::UpdateOptions, results::InsertOneResult};
6
7use super::{
8 shell::Shell, with_connection::WithConnection, with_shell_config::WithShellConfig, Env,
9};
10use crate::{
11 error::MigrationExecution, migration::Migration, migration_record::MigrationRecord,
12 migration_status::MigrationStatus,
13};
14
15pub struct WithMigrationsVec {
16 pub with_shell_config: Option<WithShellConfig>,
17 pub with_connection: WithConnection,
18 pub migrations: Vec<Box<dyn Migration>>,
19}
20
21impl WithMigrationsVec {
22 fn get_not_executed_migrations_ids(&self, first_failed_migration_index: usize) -> Vec<String> {
23 if self.migrations.len() - 1 == first_failed_migration_index {
24 vec![]
25 } else {
26 self.migrations[first_failed_migration_index + 1..]
27 .iter()
28 .map(|m| m.get_id().to_string())
29 .collect::<Vec<_>>()
30 }
31 }
32
33 async fn get_migrations_ids_to_execute_from_index(&self, lookup_from: usize) -> Vec<String> {
34 if self.migrations.len() - 1 == lookup_from {
35 vec![]
36 } else {
37 let ids = self.migrations[lookup_from..]
38 .into_iter()
39 .map(|migration| migration.get_id().to_string())
40 .collect::<Vec<String>>();
41
42 let mut failed = self.with_connection
43 .db
44 .clone()
45 .collection("migrations")
46 .find(
47 bson::doc! {"_id": {"$in": ids.clone()}, "status": format!("{:?}", MigrationStatus::Fail)},
48 None,
49 )
50 .await.unwrap().collect::<Vec<_>>().await
51 .into_iter()
52 .map(|v| bson::from_bson(Bson::Document(v.unwrap())).unwrap())
54 .map(|v: MigrationRecord| v._id.to_string())
55 .collect::<Vec<String>>();
56
57 let all = self
59 .with_connection
60 .db
61 .clone()
62 .collection("migrations")
63 .find(bson::doc! {}, None)
64 .await
65 .unwrap()
66 .collect::<Vec<_>>()
67 .await
68 .into_iter()
69 .map(|v| bson::from_bson(Bson::Document(v.unwrap())).unwrap())
71 .map(|v: MigrationRecord| v._id.to_string())
72 .collect::<Vec<String>>();
73
74 failed.extend(ids.into_iter().filter(|id| !all.contains(&id)));
75 failed
76 }
77 }
78
79 pub async fn up(&self) -> Result<(), MigrationExecution> {
92 self.validate()?;
93
94 let ids = self.get_migrations_ids_to_execute_from_index(0).await;
96 tracing::info!(
97 message = "the following migrations are going to be executed",
98 ids = format!("{:?}", ids),
99 op = format!("{:?}", OperationType::Up)
100 );
101 for (i, migration) in self
102 .migrations
103 .iter()
104 .filter(|m| ids.contains(&m.get_id().to_string()))
105 .enumerate()
106 {
107 let r = self
108 .try_run_migration(migration, i, OperationType::Up)
109 .await;
110 self.trace_result(&migration, &r, OperationType::Up);
111
112 r?
113 }
114
115 Ok(())
116 }
117
118 pub async fn down(&self) -> Result<(), MigrationExecution> {
119 self.validate()?;
120
121 let ids = self.get_migrations_ids_to_execute_from_index(0).await;
122 tracing::info!(
123 message = "the following migrations are going to be executed",
124 ids = format!("{:?}", ids),
125 op = format!("{:?}", OperationType::Down)
126 );
127 for (i, migration) in self
128 .migrations
129 .iter()
130 .rev()
131 .filter(|m| ids.contains(&m.get_id().to_string()))
132 .enumerate()
133 {
134 let r = self
135 .try_run_migration(migration, i, OperationType::Down)
136 .await;
137 self.trace_result(&migration, &r, OperationType::Down);
138
139 r?
140 }
141
142 Ok(())
143 }
144
145 async fn save_not_executed_migrations(
146 &self,
147 save_from_index: usize,
148 ) -> Result<(), MigrationExecution> {
149 if self.migrations.len() - 1 == save_from_index {
150 return Ok(());
151 }
152
153 for (i, migration) in self.migrations[save_from_index..].iter().enumerate() {
154 let migration_record = MigrationRecord::migration_start(migration.get_id().to_string());
155 let migration_record = MigrationRecord::migration_failed(migration_record);
156 let serialized_to_document_migration_record = bson::to_document(&migration_record)
157 .map_err(|error| MigrationExecution::InitialMigrationRecord {
158 migration_id: migration.get_id().to_string(),
159 migration_record: migration_record.clone(),
160 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
161 additional_info: error,
162 })?;
163
164 let mut u_o: UpdateOptions = Default::default();
165 u_o.upsert = Some(true);
166
167 self.with_connection
168 .db
169 .clone()
170 .collection::<MigrationRecord>("migrations")
171 .update_one(
172 bson::doc! {"_id": &migration_record._id},
173 bson::doc! {"$set": serialized_to_document_migration_record},
174 u_o,
175 )
176 .await
177 .map_err(
178 |error| MigrationExecution::FinishedButNotSavedDueMongoError {
179 migration_id: migration.get_id().to_string(),
180 migration_status: format!("{:?}", &migration_record.status),
181 additional_info: error,
182 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
183 },
184 )?;
185 }
186
187 Ok(())
188 }
189
190 pub async fn up_single_from_vec(&self, migration_id: String) -> Result<(), MigrationExecution> {
192 self.validate()?;
193
194 let migration = self
195 .migrations
196 .iter()
197 .enumerate()
198 .find(|(_index, migration)| migration.get_id().to_string() == migration_id);
199
200 if migration.is_some() {
201 let (index, migration) = migration.unwrap();
202 let r = self
203 .try_run_migration(migration, index, OperationType::Up)
204 .await;
205 self.trace_result(&migration, &r, OperationType::Up);
206
207 r
208 } else {
209 Err(MigrationExecution::MigrationFromVecNotFound { migration_id })
210 }
211 }
212
213 pub async fn down_single_from_vec(
215 &self,
216 migration_id: String,
217 ) -> Result<(), MigrationExecution> {
218 self.validate()?;
219
220 let migration = self
221 .migrations
222 .iter()
223 .enumerate()
224 .find(|(_index, migration)| migration.get_id().to_string() == migration_id);
225
226 if migration.is_some() {
227 let (index, migration) = migration.unwrap();
228 let r = self
229 .try_run_migration(migration, index, OperationType::Down)
230 .await;
231 self.trace_result(&migration, &r, OperationType::Down);
232
233 r
234 } else {
235 Err(MigrationExecution::MigrationFromVecNotFound { migration_id })
236 }
237 }
238
239 fn validate(&self) -> Result<(), MigrationExecution> {
240 let mut entries = BTreeMap::new();
241 self.migrations
242 .iter()
243 .enumerate()
244 .for_each(|(index, migration)| {
245 let entry = entries
246 .entry(migration.get_id().to_string())
247 .or_insert(vec![]);
248 entry.push(index);
249 });
250
251 let duplicates = entries
252 .into_iter()
253 .filter(|(_id, indices)| indices.len() > 1)
254 .collect::<BTreeMap<String, Vec<usize>>>();
255
256 if duplicates.len() > 0 {
257 Err(MigrationExecution::PassedMigrationsWithDuplicatedIds { duplicates })
258 } else {
259 Ok(())
260 }
261 }
262
263 fn prepare_initial_migration_record(
264 &self,
265 migration: &Box<dyn Migration>,
266 i: usize,
267 ) -> Result<(Document, MigrationRecord), MigrationExecution> {
268 let migration_record = MigrationRecord::migration_start(migration.get_id().to_string());
269
270 Ok((
271 bson::to_document(&migration_record).map_err(|error| {
272 MigrationExecution::InitialMigrationRecord {
273 migration_id: migration.get_id().to_string(),
274 migration_record: migration_record.clone(),
275 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
276 additional_info: error,
277 }
278 })?,
279 migration_record,
280 ))
281 }
282
283 async fn save_initial_migration_record(
284 &self,
285 migration: &Box<dyn Migration>,
286 serialized_to_document_migration_record: Document,
287 i: usize,
288 ) -> Result<InsertOneResult, MigrationExecution> {
289 Ok(self
290 .with_connection
291 .db
292 .clone()
293 .collection("migrations")
294 .insert_one(serialized_to_document_migration_record, None)
295 .await
296 .map_err(|error| MigrationExecution::InProgressStatusNotSaved {
297 migration_id: migration.get_id().to_string(),
298 additional_info: error,
299 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
300 })?)
301 }
302
303 async fn save_executed_migration_record(
304 &self,
305 migration: &Box<dyn Migration>,
306 migration_record: &MigrationRecord,
307 serialized_to_document_migration_record: Document,
308 res: InsertOneResult,
309 i: usize,
310 ) -> Result<(), MigrationExecution> {
311 let mut u_o: UpdateOptions = Default::default();
312 u_o.upsert = Some(true);
313
314 self.with_connection
315 .db
316 .clone()
317 .collection::<MigrationRecord>("migrations")
318 .update_one(
319 bson::doc! {"_id": res.inserted_id},
320 bson::doc! {"$set": serialized_to_document_migration_record},
321 u_o,
322 )
323 .await
324 .map_err(
325 |error| MigrationExecution::FinishedButNotSavedDueMongoError {
326 migration_id: migration.get_id().to_string(),
327 migration_status: format!("{:?}", &migration_record.status),
328 additional_info: error,
329 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
330 },
331 )?;
332
333 Ok(())
334 }
335
336 fn try_get_mongo_shell(&self) -> Option<Shell> {
337 if self.with_shell_config.is_some() {
338 Some(Shell {
339 config: self
340 .with_shell_config
341 .clone()
342 .expect("shell config is present")
343 .with_shell_config,
344 })
345 } else {
346 None
347 }
348 }
349
350 async fn up_migration(
351 &self,
352 migration: &Box<dyn Migration>,
353 shell: Option<Shell>,
354 migration_record: &MigrationRecord,
355 ) -> MigrationRecord {
356 migration
357 .clone()
358 .up(Env {
359 db: Some(self.with_connection.db.clone()),
360 shell,
361 ..Default::default()
362 })
363 .await
364 .map_or_else(
365 |_| migration_record.clone().migration_failed(),
366 |_| migration_record.clone().migration_succeeded(),
367 )
368 }
369
370 async fn down_migration(
371 &self,
372 migration: &Box<dyn Migration>,
373 shell: Option<Shell>,
374 migration_record: &MigrationRecord,
375 ) -> MigrationRecord {
376 migration
377 .clone()
378 .down(Env {
379 db: Some(self.with_connection.db.clone()),
380 shell,
381 ..Default::default()
382 })
383 .await
384 .map_or_else(
385 |_| migration_record.clone().migration_failed(),
386 |_| migration_record.clone().migration_succeeded(),
387 )
388 }
389
390 async fn try_run_migration(
391 &self,
392 migration: &Box<dyn Migration>,
393 i: usize,
394 operation_type: OperationType,
395 ) -> Result<(), MigrationExecution> {
396 tracing::info!(
397 id = migration.get_id(),
398 op = format!("{:?}", operation_type),
399 status = format!("{:?}", MigrationStatus::InProgress)
400 );
401
402 let (serialized_to_document_migration_record, migration_record) =
403 self.prepare_initial_migration_record(migration, i)?;
404
405 let res = self
406 .save_initial_migration_record(migration, serialized_to_document_migration_record, i)
407 .await?;
408
409 let shell = self.try_get_mongo_shell();
410
411 let migration_record = match operation_type {
412 OperationType::Up => self.up_migration(migration, shell, &migration_record).await,
413 OperationType::Down => {
414 self.down_migration(migration, shell, &migration_record)
415 .await
416 }
417 };
418
419 let serialized_to_document_migration_record = bson::to_document(&migration_record)
420 .map_err(
421 |error| MigrationExecution::FinishedButNotSavedDueToSerialization {
422 migration_id: migration.get_id().to_string(),
423 migration_status: format!("{:?}", &migration_record.status),
424 migration_record: migration_record.clone(),
425 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
426 additional_info: error,
427 },
428 )?;
429
430 self.save_executed_migration_record(
431 migration,
432 &migration_record,
433 serialized_to_document_migration_record,
434 res,
435 i,
436 )
437 .await?;
438
439 if migration_record.status == MigrationStatus::Fail {
440 self.save_not_executed_migrations(i + 1).await?;
441 return Err(MigrationExecution::FinishedAndSavedAsFail {
442 migration_id: migration.get_id().to_string(),
443 next_not_executed_migrations_ids: self.get_not_executed_migrations_ids(i),
444 });
445 }
446
447 Ok(())
448 }
449
450 fn trace_result(
451 &self,
452 migration: &Box<dyn Migration>,
453 migration_result: &Result<(), MigrationExecution>,
454 operation_type: OperationType,
455 ) {
456 tracing::info!(
457 id = migration.get_id(),
458 op = format!("{:?}", operation_type),
459 status = format!(
460 "{:?}",
461 (if migration_result.is_ok() {
462 MigrationStatus::Success
463 } else {
464 MigrationStatus::Fail
465 })
466 )
467 );
468 }
469}
470
471#[derive(Debug)]
472enum OperationType {
473 Up,
474 Down,
475}