1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; use crate::Migration; use async_trait::async_trait; use chrono::{DateTime, Local}; use mysql_async::{ prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts, }; async fn query_applied_migrations<'a>( mut transaction: MTransaction<'a>, query: &str, ) -> Result<(MTransaction<'a>, Vec<Migration>), MError> { let result = transaction.query(query).await?; let applied = result .into_iter() .map(|row| { let (version, name, applied_on, checksum): (i32, String, String, String) = mysql_async::from_row(row); let applied_on = DateTime::parse_from_rfc3339(&applied_on) .unwrap() .with_timezone(&Local); Migration::applied( version, name, applied_on, checksum .parse::<u64>() .expect("checksum must be a valid u64"), ) }) .collect(); Ok((transaction, applied)) } #[async_trait] impl AsyncTransaction for Pool { type Error = MError; async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> { let mut conn = self.get_conn().await?; let mut options = TxOpts::new(); options.with_isolation_level(Some(IsolationLevel::ReadCommitted)); let mut transaction = conn.start_transaction(options).await?; let mut count = 0; for query in queries { transaction.query_drop(query).await?; count += 1; } transaction.commit().await?; Ok(count as usize) } } #[async_trait] impl AsyncQuery<Vec<Migration>> for Pool { async fn query( &mut self, query: &str, ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> { let mut conn = self.get_conn().await?; let mut options = TxOpts::new(); options.with_isolation_level(Some(IsolationLevel::ReadCommitted)); let transaction = conn.start_transaction(options).await?; let (transaction, applied) = query_applied_migrations(transaction, query).await?; transaction.commit().await?; Ok(applied) } } impl AsyncMigrate for Pool {}