refinery_core/drivers/
mysql_async.rs

1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use mysql_async::{
5    prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10async fn query_applied_migrations<'a>(
11    mut transaction: MTransaction<'a>,
12    query: &str,
13) -> Result<(MTransaction<'a>, Vec<Migration>), MError> {
14    let result = transaction.query(query).await?;
15
16    let applied = result
17        .into_iter()
18        .map(|row| {
19            let (version, name, applied_on, checksum): (i32, String, String, String) =
20                mysql_async::from_row(row);
21
22            // Safe to call unwrap, as we stored it in RFC3339 format on the database
23            let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
24            Migration::applied(
25                version,
26                name,
27                applied_on,
28                checksum
29                    .parse::<u64>()
30                    .expect("checksum must be a valid u64"),
31            )
32        })
33        .collect();
34
35    Ok((transaction, applied))
36}
37
38#[async_trait]
39impl AsyncTransaction for Pool {
40    type Error = MError;
41
42    async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
43        let mut conn = self.get_conn().await?;
44        let mut options = TxOpts::new();
45        options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
46
47        let mut transaction = conn.start_transaction(options).await?;
48        let mut count = 0;
49        for query in queries {
50            transaction.query_drop(*query).await?;
51            count += 1;
52        }
53        transaction.commit().await?;
54        Ok(count as usize)
55    }
56}
57
58#[async_trait]
59impl AsyncQuery<Vec<Migration>> for Pool {
60    async fn query(
61        &mut self,
62        query: &str,
63    ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
64        let mut conn = self.get_conn().await?;
65        let mut options = TxOpts::new();
66        options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
67        let transaction = conn.start_transaction(options).await?;
68
69        let (transaction, applied) = query_applied_migrations(transaction, query).await?;
70        transaction.commit().await?;
71        Ok(applied)
72    }
73}
74
75impl AsyncMigrate for Pool {}