1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use crate::traits::sync::{Migrate, Query, Transaction};
use crate::Migration;
use mysql::{
error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn,
Transaction as MTransaction, TxOpts,
};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
fn get_tx_opts() -> TxOpts {
TxOpts::default()
.set_with_consistent_snapshot(true)
.set_access_mode(None)
.set_isolation_level(Some(IsolationLevel::RepeatableRead))
}
fn query_applied_migrations(
transaction: &mut MTransaction,
query: &str,
) -> Result<Vec<Migration>, MError> {
let rows = transaction.query_iter(query)?;
let mut applied = Vec::new();
for row in rows {
let row = row?;
let version = row.get(0).unwrap();
let applied_on: String = row.get(2).unwrap();
let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
let checksum: String = row.get(3).unwrap();
applied.push(Migration::applied(
version,
row.get(1).unwrap(),
applied_on,
checksum
.parse::<u64>()
.expect("checksum must be a valid u64"),
))
}
Ok(applied)
}
impl Transaction for Conn {
type Error = MError;
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let mut transaction = self.start_transaction(get_tx_opts())?;
let mut count = 0;
for query in queries.iter() {
transaction.query_iter(query)?;
count += 1;
}
transaction.commit()?;
Ok(count as usize)
}
}
impl Transaction for PooledConn {
type Error = MError;
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let mut transaction = self.start_transaction(get_tx_opts())?;
let mut count = 0;
for query in queries.iter() {
transaction.query_iter(query)?;
count += 1;
}
transaction.commit()?;
Ok(count as usize)
}
}
impl Query<Vec<Migration>> for Conn {
fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
let mut transaction = self.start_transaction(get_tx_opts())?;
let applied = query_applied_migrations(&mut transaction, query)?;
transaction.commit()?;
Ok(applied)
}
}
impl Query<Vec<Migration>> for PooledConn {
fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
let mut transaction = self.start_transaction(get_tx_opts())?;
let applied = query_applied_migrations(&mut transaction, query)?;
transaction.commit()?;
Ok(applied)
}
}
impl Migrate for Conn {}
impl Migrate for PooledConn {}