refinery_core/drivers/
mysql_async.rs1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use mysql_async::{
5 prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10async fn query_applied_migrations<'a>(
11 mut transaction: MTransaction<'a>,
12 query: &str,
13) -> Result<(MTransaction<'a>, Vec<Migration>), MError> {
14 let result = transaction.query(query).await?;
15
16 let applied = result
17 .into_iter()
18 .map(|row| {
19 let (version, name, applied_on, checksum): (i32, String, String, String) =
20 mysql_async::from_row(row);
21
22 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
24 Migration::applied(
25 version,
26 name,
27 applied_on,
28 checksum
29 .parse::<u64>()
30 .expect("checksum must be a valid u64"),
31 )
32 })
33 .collect();
34
35 Ok((transaction, applied))
36}
37
38#[async_trait]
39impl AsyncTransaction for Pool {
40 type Error = MError;
41
42 async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
43 let mut conn = self.get_conn().await?;
44 let mut options = TxOpts::new();
45 options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
46
47 let mut transaction = conn.start_transaction(options).await?;
48 let mut count = 0;
49 for query in queries {
50 transaction.query_drop(*query).await?;
51 count += 1;
52 }
53 transaction.commit().await?;
54 Ok(count as usize)
55 }
56}
57
58#[async_trait]
59impl AsyncQuery<Vec<Migration>> for Pool {
60 async fn query(
61 &mut self,
62 query: &str,
63 ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
64 let mut conn = self.get_conn().await?;
65 let mut options = TxOpts::new();
66 options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
67 let transaction = conn.start_transaction(options).await?;
68
69 let (transaction, applied) = query_applied_migrations(transaction, query).await?;
70 transaction.commit().await?;
71 Ok(applied)
72 }
73}
74
75impl AsyncMigrate for Pool {}