refinery_core/drivers/
mysql.rs1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use mysql::{
4 error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn,
5 Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10fn get_tx_opts() -> TxOpts {
11 TxOpts::default()
12 .set_with_consistent_snapshot(true)
13 .set_access_mode(None)
14 .set_isolation_level(Some(IsolationLevel::RepeatableRead))
15}
16
17fn query_applied_migrations(
18 transaction: &mut MTransaction,
19 query: &str,
20) -> Result<Vec<Migration>, MError> {
21 let rows = transaction.query_iter(query)?;
22 let mut applied = Vec::new();
23 for row in rows {
24 let row = row?;
25 let version = row.get(0).unwrap();
26 let applied_on: String = row.get(2).unwrap();
27 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
29 let checksum: String = row.get(3).unwrap();
30
31 applied.push(Migration::applied(
32 version,
33 row.get(1).unwrap(),
34 applied_on,
35 checksum
36 .parse::<u64>()
37 .expect("checksum must be a valid u64"),
38 ))
39 }
40 Ok(applied)
41}
42
43impl Transaction for Conn {
44 type Error = MError;
45
46 fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
47 let mut transaction = self.start_transaction(get_tx_opts())?;
48 let mut count = 0;
49 for query in queries.iter() {
50 transaction.query_iter(query)?;
51 count += 1;
52 }
53 transaction.commit()?;
54 Ok(count as usize)
55 }
56}
57
58impl Transaction for PooledConn {
59 type Error = MError;
60
61 fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
62 let mut transaction = self.start_transaction(get_tx_opts())?;
63 let mut count = 0;
64
65 for query in queries.iter() {
66 transaction.query_iter(query)?;
67 count += 1;
68 }
69 transaction.commit()?;
70 Ok(count as usize)
71 }
72}
73
74impl Query<Vec<Migration>> for Conn {
75 fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
76 let mut transaction = self.start_transaction(get_tx_opts())?;
77 let applied = query_applied_migrations(&mut transaction, query)?;
78 transaction.commit()?;
79 Ok(applied)
80 }
81}
82
83impl Query<Vec<Migration>> for PooledConn {
84 fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
85 let mut transaction = self.start_transaction(get_tx_opts())?;
86 let applied = query_applied_migrations(&mut transaction, query)?;
87 transaction.commit()?;
88 Ok(applied)
89 }
90}
91
92impl Migrate for Conn {}
93impl Migrate for PooledConn {}