1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
use crate::traits::sync::{Migrate, Query, Transaction}; use crate::Migration; use chrono::{DateTime, Local}; use mysql::{ error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn, Transaction as MTransaction, TxOpts, }; fn get_tx_opts() -> TxOpts { TxOpts::default() .set_with_consistent_snapshot(true) .set_access_mode(None) .set_isolation_level(Some(IsolationLevel::RepeatableRead)) } fn query_applied_migrations( transaction: &mut MTransaction, query: &str, ) -> Result<Vec<Migration>, MError> { let rows = transaction.query_iter(query)?; let mut applied = Vec::new(); for row in rows { let row = row?; let version = row.get(0).unwrap(); let applied_on: String = row.get(2).unwrap(); let applied_on = DateTime::parse_from_rfc3339(&applied_on) .unwrap() .with_timezone(&Local); let checksum: String = row.get(3).unwrap(); applied.push(Migration::applied( version, row.get(1).unwrap(), applied_on, checksum .parse::<u64>() .expect("checksum must be a valid u64"), )) } Ok(applied) } impl Transaction for Conn { type Error = MError; fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> { let mut transaction = self.start_transaction(get_tx_opts())?; let mut count = 0; for query in queries.iter() { transaction.query_iter(query)?; count += 1; } transaction.commit()?; Ok(count as usize) } } impl Transaction for PooledConn { type Error = MError; fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> { let mut transaction = self.start_transaction(get_tx_opts())?; let mut count = 0; for query in queries.iter() { transaction.query_iter(query)?; count += 1; } transaction.commit()?; Ok(count as usize) } } impl Query<Vec<Migration>> for Conn { fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> { let mut transaction = self.start_transaction(get_tx_opts())?; let applied = query_applied_migrations(&mut transaction, query)?; transaction.commit()?; Ok(applied) } } impl Query<Vec<Migration>> for PooledConn { fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> { let mut transaction = self.start_transaction(get_tx_opts())?; let applied = query_applied_migrations(&mut transaction, query)?; transaction.commit()?; Ok(applied) } } impl Migrate for Conn {} impl Migrate for PooledConn {}