refinery_core/drivers/
mysql_async.rs1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use mysql_async::{
5 prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10async fn query_applied_migrations<'a>(
11 mut transaction: MTransaction<'a>,
12 query: &str,
13) -> Result<(MTransaction<'a>, Vec<Migration>), MError> {
14 let result = transaction.query(query).await?;
15
16 let applied = result
17 .into_iter()
18 .map(|row| {
19 let (version, name, applied_on, checksum): (i32, String, String, String) =
20 mysql_async::from_row(row);
21
22 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
24 Migration::applied(
25 version,
26 name,
27 applied_on,
28 checksum
29 .parse::<u64>()
30 .expect("checksum must be a valid u64"),
31 )
32 })
33 .collect();
34
35 Ok((transaction, applied))
36}
37
38#[async_trait]
39impl AsyncTransaction for Pool {
40 type Error = MError;
41
42 async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
43 &mut self,
44 queries: T,
45 ) -> Result<usize, Self::Error> {
46 let mut conn = self.get_conn().await?;
47 let mut options = TxOpts::new();
48 options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
49
50 let mut transaction = conn.start_transaction(options).await?;
51 let mut count = 0;
52 for query in queries {
53 transaction.query_drop(query).await?;
54 count += 1;
55 }
56 transaction.commit().await?;
57 Ok(count as usize)
58 }
59}
60
61#[async_trait]
62impl AsyncQuery<Vec<Migration>> for Pool {
63 async fn query(
64 &mut self,
65 query: &str,
66 ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
67 let mut conn = self.get_conn().await?;
68 let mut options = TxOpts::new();
69 options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
70 let transaction = conn.start_transaction(options).await?;
71
72 let (transaction, applied) = query_applied_migrations(transaction, query).await?;
73 transaction.commit().await?;
74 Ok(applied)
75 }
76}
77
78impl AsyncMigrate for Pool {}