refinery_core/drivers/
mysql.rs

1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use mysql::{
4    error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn,
5    Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10fn get_tx_opts() -> TxOpts {
11    TxOpts::default()
12        .set_with_consistent_snapshot(true)
13        .set_access_mode(None)
14        .set_isolation_level(Some(IsolationLevel::RepeatableRead))
15}
16
17fn query_applied_migrations(
18    transaction: &mut MTransaction,
19    query: &str,
20) -> Result<Vec<Migration>, MError> {
21    let rows = transaction.query_iter(query)?;
22    let mut applied = Vec::new();
23    for row in rows {
24        let row = row?;
25        let version = row.get(0).unwrap();
26        let applied_on: String = row.get(2).unwrap();
27        // Safe to call unwrap, as we stored it in RFC3339 format on the database
28        let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
29        let checksum: String = row.get(3).unwrap();
30
31        applied.push(Migration::applied(
32            version,
33            row.get(1).unwrap(),
34            applied_on,
35            checksum
36                .parse::<u64>()
37                .expect("checksum must be a valid u64"),
38        ))
39    }
40    Ok(applied)
41}
42
43impl Transaction for Conn {
44    type Error = MError;
45
46    fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
47        let mut transaction = self.start_transaction(get_tx_opts())?;
48        let mut count = 0;
49        for query in queries.iter() {
50            transaction.query_iter(query)?;
51            count += 1;
52        }
53        transaction.commit()?;
54        Ok(count as usize)
55    }
56}
57
58impl Transaction for PooledConn {
59    type Error = MError;
60
61    fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
62        let mut transaction = self.start_transaction(get_tx_opts())?;
63        let mut count = 0;
64
65        for query in queries.iter() {
66            transaction.query_iter(query)?;
67            count += 1;
68        }
69        transaction.commit()?;
70        Ok(count as usize)
71    }
72}
73
74impl Query<Vec<Migration>> for Conn {
75    fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
76        let mut transaction = self.start_transaction(get_tx_opts())?;
77        let applied = query_applied_migrations(&mut transaction, query)?;
78        transaction.commit()?;
79        Ok(applied)
80    }
81}
82
83impl Query<Vec<Migration>> for PooledConn {
84    fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
85        let mut transaction = self.start_transaction(get_tx_opts())?;
86        let applied = query_applied_migrations(&mut transaction, query)?;
87        transaction.commit()?;
88        Ok(applied)
89    }
90}
91
92impl Migrate for Conn {}
93impl Migrate for PooledConn {}