refinery_core/drivers/
mysql_async.rs

1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::util::SchemaVersion;
3use crate::Migration;
4use async_trait::async_trait;
5use mysql_async::{
6    prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts,
7};
8use time::format_description::well_known::Rfc3339;
9use time::OffsetDateTime;
10
11async fn query_applied_migrations<'a>(
12    mut transaction: MTransaction<'a>,
13    query: &str,
14) -> Result<(MTransaction<'a>, Vec<Migration>), MError> {
15    let result = transaction.query(query).await?;
16
17    let applied = result
18        .into_iter()
19        .map(|row| {
20            let (version, name, applied_on, checksum): (SchemaVersion, String, String, String) =
21                mysql_async::from_row(row);
22
23            // Safe to call unwrap, as we stored it in RFC3339 format on the database
24            let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
25            Migration::applied(
26                version,
27                name,
28                applied_on,
29                checksum
30                    .parse::<u64>()
31                    .expect("checksum must be a valid u64"),
32            )
33        })
34        .collect();
35
36    Ok((transaction, applied))
37}
38
39#[async_trait]
40impl AsyncTransaction for Pool {
41    type Error = MError;
42
43    async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
44        &mut self,
45        queries: T,
46    ) -> Result<usize, Self::Error> {
47        let mut conn = self.get_conn().await?;
48        let mut options = TxOpts::new();
49        options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
50
51        let mut transaction = conn.start_transaction(options).await?;
52        let mut count = 0;
53        for query in queries {
54            transaction.query_drop(query).await?;
55            count += 1;
56        }
57        transaction.commit().await?;
58        Ok(count as usize)
59    }
60}
61
62#[async_trait]
63impl AsyncQuery<Vec<Migration>> for Pool {
64    async fn query(
65        &mut self,
66        query: &str,
67    ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
68        let mut conn = self.get_conn().await?;
69        let mut options = TxOpts::new();
70        options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
71        let transaction = conn.start_transaction(options).await?;
72
73        let (transaction, applied) = query_applied_migrations(transaction, query).await?;
74        transaction.commit().await?;
75        Ok(applied)
76    }
77}
78
79impl AsyncMigrate for Pool {}