refinery_core/drivers/
mysql_async.rs

1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use mysql_async::{
5    prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10async fn query_applied_migrations<'a>(
11    mut transaction: MTransaction<'a>,
12    query: &str,
13) -> Result<(MTransaction<'a>, Vec<Migration>), MError> {
14    let result = transaction.query(query).await?;
15
16    let applied = result
17        .into_iter()
18        .map(|row| {
19            let (version, name, applied_on, checksum): (i32, String, String, String) =
20                mysql_async::from_row(row);
21
22            // Safe to call unwrap, as we stored it in RFC3339 format on the database
23            let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
24            Migration::applied(
25                version,
26                name,
27                applied_on,
28                checksum
29                    .parse::<u64>()
30                    .expect("checksum must be a valid u64"),
31            )
32        })
33        .collect();
34
35    Ok((transaction, applied))
36}
37
38#[async_trait]
39impl AsyncTransaction for Pool {
40    type Error = MError;
41
42    async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
43        &mut self,
44        queries: T,
45    ) -> Result<usize, Self::Error> {
46        let mut conn = self.get_conn().await?;
47        let mut options = TxOpts::new();
48        options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
49
50        let mut transaction = conn.start_transaction(options).await?;
51        let mut count = 0;
52        for query in queries {
53            transaction.query_drop(query).await?;
54            count += 1;
55        }
56        transaction.commit().await?;
57        Ok(count as usize)
58    }
59}
60
61#[async_trait]
62impl AsyncQuery<Vec<Migration>> for Pool {
63    async fn query(
64        &mut self,
65        query: &str,
66    ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
67        let mut conn = self.get_conn().await?;
68        let mut options = TxOpts::new();
69        options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
70        let transaction = conn.start_transaction(options).await?;
71
72        let (transaction, applied) = query_applied_migrations(transaction, query).await?;
73        transaction.commit().await?;
74        Ok(applied)
75    }
76}
77
78impl AsyncMigrate for Pool {}