refinery_core/drivers/
mysql.rs1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use mysql::{
4 error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn,
5 Transaction as MTransaction, TxOpts,
6};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10fn get_tx_opts() -> TxOpts {
11 TxOpts::default()
12 .set_with_consistent_snapshot(true)
13 .set_access_mode(None)
14 .set_isolation_level(Some(IsolationLevel::RepeatableRead))
15}
16
17fn query_applied_migrations(
18 transaction: &mut MTransaction,
19 query: &str,
20) -> Result<Vec<Migration>, MError> {
21 let rows = transaction.query_iter(query)?;
22 let mut applied = Vec::new();
23 for row in rows {
24 let row = row?;
25 let version = row.get(0).unwrap();
26 let applied_on: String = row.get(2).unwrap();
27 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
29 let checksum: String = row.get(3).unwrap();
30
31 applied.push(Migration::applied(
32 version,
33 row.get(1).unwrap(),
34 applied_on,
35 checksum
36 .parse::<u64>()
37 .expect("checksum must be a valid u64"),
38 ))
39 }
40 Ok(applied)
41}
42
43impl Transaction for Conn {
44 type Error = MError;
45
46 fn execute<'a, T: Iterator<Item = &'a str>>(
47 &mut self,
48 queries: T,
49 ) -> Result<usize, Self::Error> {
50 let mut transaction = self.start_transaction(get_tx_opts())?;
51 let mut count = 0;
52 for query in queries {
53 transaction.query_iter(query)?;
54 count += 1;
55 }
56 transaction.commit()?;
57 Ok(count as usize)
58 }
59}
60
61impl Transaction for PooledConn {
62 type Error = MError;
63
64 fn execute<'a, T: Iterator<Item = &'a str>>(
65 &mut self,
66 queries: T,
67 ) -> Result<usize, Self::Error> {
68 let mut transaction = self.start_transaction(get_tx_opts())?;
69 let mut count = 0;
70
71 for query in queries {
72 transaction.query_iter(query)?;
73 count += 1;
74 }
75 transaction.commit()?;
76 Ok(count as usize)
77 }
78}
79
80impl Query<Vec<Migration>> for Conn {
81 fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
82 let mut transaction = self.start_transaction(get_tx_opts())?;
83 let applied = query_applied_migrations(&mut transaction, query)?;
84 transaction.commit()?;
85 Ok(applied)
86 }
87}
88
89impl Query<Vec<Migration>> for PooledConn {
90 fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
91 let mut transaction = self.start_transaction(get_tx_opts())?;
92 let applied = query_applied_migrations(&mut transaction, query)?;
93 transaction.commit()?;
94 Ok(applied)
95 }
96}
97
98impl Migrate for Conn {}
99impl Migrate for PooledConn {}