refinery_core/drivers/
mysql_async.rs1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::util::SchemaVersion;
3use crate::Migration;
4use async_trait::async_trait;
5use mysql_async::{
6 prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts,
7};
8use time::format_description::well_known::Rfc3339;
9use time::OffsetDateTime;
10
11async fn query_applied_migrations<'a>(
12 mut transaction: MTransaction<'a>,
13 query: &str,
14) -> Result<(MTransaction<'a>, Vec<Migration>), MError> {
15 let result = transaction.query(query).await?;
16
17 let applied = result
18 .into_iter()
19 .map(|row| {
20 let (version, name, applied_on, checksum): (SchemaVersion, String, String, String) =
21 mysql_async::from_row(row);
22
23 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
25 Migration::applied(
26 version,
27 name,
28 applied_on,
29 checksum
30 .parse::<u64>()
31 .expect("checksum must be a valid u64"),
32 )
33 })
34 .collect();
35
36 Ok((transaction, applied))
37}
38
39#[async_trait]
40impl AsyncTransaction for Pool {
41 type Error = MError;
42
43 async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
44 &mut self,
45 queries: T,
46 ) -> Result<usize, Self::Error> {
47 let mut conn = self.get_conn().await?;
48 let mut options = TxOpts::new();
49 options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
50
51 let mut transaction = conn.start_transaction(options).await?;
52 let mut count = 0;
53 for query in queries {
54 transaction.query_drop(query).await?;
55 count += 1;
56 }
57 transaction.commit().await?;
58 Ok(count as usize)
59 }
60}
61
62#[async_trait]
63impl AsyncQuery<Vec<Migration>> for Pool {
64 async fn query(
65 &mut self,
66 query: &str,
67 ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
68 let mut conn = self.get_conn().await?;
69 let mut options = TxOpts::new();
70 options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
71 let transaction = conn.start_transaction(options).await?;
72
73 let (transaction, applied) = query_applied_migrations(transaction, query).await?;
74 transaction.commit().await?;
75 Ok(applied)
76 }
77}
78
79impl AsyncMigrate for Pool {}