refinery_core/drivers/
mysql.rs

1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use mysql::{
4    error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn,
5    Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10fn get_tx_opts() -> TxOpts {
11    TxOpts::default()
12        .set_with_consistent_snapshot(true)
13        .set_access_mode(None)
14        .set_isolation_level(Some(IsolationLevel::RepeatableRead))
15}
16
17fn query_applied_migrations(
18    transaction: &mut MTransaction,
19    query: &str,
20) -> Result<Vec<Migration>, MError> {
21    let rows = transaction.query_iter(query)?;
22    let mut applied = Vec::new();
23    for row in rows {
24        let row = row?;
25        let version = row.get(0).unwrap();
26        let applied_on: String = row.get(2).unwrap();
27        // Safe to call unwrap, as we stored it in RFC3339 format on the database
28        let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
29        let checksum: String = row.get(3).unwrap();
30
31        applied.push(Migration::applied(
32            version,
33            row.get(1).unwrap(),
34            applied_on,
35            checksum
36                .parse::<u64>()
37                .expect("checksum must be a valid u64"),
38        ))
39    }
40    Ok(applied)
41}
42
43impl Transaction for Conn {
44    type Error = MError;
45
46    fn execute<'a, T: Iterator<Item = &'a str>>(
47        &mut self,
48        queries: T,
49    ) -> Result<usize, Self::Error> {
50        let mut transaction = self.start_transaction(get_tx_opts())?;
51        let mut count = 0;
52        for query in queries {
53            transaction.query_iter(query)?;
54            count += 1;
55        }
56        transaction.commit()?;
57        Ok(count as usize)
58    }
59}
60
61impl Transaction for PooledConn {
62    type Error = MError;
63
64    fn execute<'a, T: Iterator<Item = &'a str>>(
65        &mut self,
66        queries: T,
67    ) -> Result<usize, Self::Error> {
68        let mut transaction = self.start_transaction(get_tx_opts())?;
69        let mut count = 0;
70
71        for query in queries {
72            transaction.query_iter(query)?;
73            count += 1;
74        }
75        transaction.commit()?;
76        Ok(count as usize)
77    }
78}
79
80impl Query<Vec<Migration>> for Conn {
81    fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
82        let mut transaction = self.start_transaction(get_tx_opts())?;
83        let applied = query_applied_migrations(&mut transaction, query)?;
84        transaction.commit()?;
85        Ok(applied)
86    }
87}
88
89impl Query<Vec<Migration>> for PooledConn {
90    fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
91        let mut transaction = self.start_transaction(get_tx_opts())?;
92        let applied = query_applied_migrations(&mut transaction, query)?;
93        transaction.commit()?;
94        Ok(applied)
95    }
96}
97
98impl Migrate for Conn {}
99impl Migrate for PooledConn {}