refinery_core/drivers/
postgres.rs1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use postgres::{Client as PgClient, Error as PgError, Transaction as PgTransaction};
4use time::format_description::well_known::Rfc3339;
5use time::OffsetDateTime;
6
7fn query_applied_migrations(
8 transaction: &mut PgTransaction,
9 query: &str,
10) -> Result<Vec<Migration>, PgError> {
11 let rows = transaction.query(query, &[])?;
12 let mut applied = Vec::new();
13 for row in rows.into_iter() {
14 let version = row.get(0);
15 let applied_on: String = row.get(2);
16 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
18
19 let checksum: String = row.get(3);
20
21 applied.push(Migration::applied(
22 version,
23 row.get(1),
24 applied_on,
25 checksum
26 .parse::<u64>()
27 .expect("checksum must be a valid u64"),
28 ));
29 }
30 Ok(applied)
31}
32
33impl Transaction for PgClient {
34 type Error = PgError;
35
36 fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
37 let mut transaction = PgClient::transaction(self)?;
38 let mut count = 0;
39 for query in queries.iter() {
40 PgTransaction::batch_execute(&mut transaction, query)?;
41 count += 1;
42 }
43 transaction.commit()?;
44 Ok(count as usize)
45 }
46}
47
48impl Query<Vec<Migration>> for PgClient {
49 fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
50 let mut transaction = PgClient::transaction(self)?;
51 let applied = query_applied_migrations(&mut transaction, query)?;
52 transaction.commit()?;
53 Ok(applied)
54 }
55}
56
57impl Migrate for PgClient {}