refinery_core/drivers/
postgres.rs1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use postgres::{Client as PgClient, Error as PgError, Transaction as PgTransaction};
4use time::format_description::well_known::Rfc3339;
5use time::OffsetDateTime;
6
7fn query_applied_migrations(
8 transaction: &mut PgTransaction,
9 query: &str,
10) -> Result<Vec<Migration>, PgError> {
11 let rows = transaction.query(query, &[])?;
12 let mut applied = Vec::new();
13 for row in rows.into_iter() {
14 let version = row.get(0);
15 let applied_on: String = row.get(2);
16 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
18
19 let checksum: String = row.get(3);
20
21 applied.push(Migration::applied(
22 version,
23 row.get(1),
24 applied_on,
25 checksum
26 .parse::<u64>()
27 .expect("checksum must be a valid u64"),
28 ));
29 }
30 Ok(applied)
31}
32
33impl Transaction for PgClient {
34 type Error = PgError;
35
36 fn execute<'a, T: Iterator<Item = &'a str>>(
37 &mut self,
38 queries: T,
39 ) -> Result<usize, Self::Error> {
40 let mut transaction = PgClient::transaction(self)?;
41 let mut count = 0;
42 for query in queries {
43 PgTransaction::batch_execute(&mut transaction, query)?;
44 count += 1;
45 }
46 transaction.commit()?;
47 Ok(count as usize)
48 }
49}
50
51impl Query<Vec<Migration>> for PgClient {
52 fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
53 let mut transaction = PgClient::transaction(self)?;
54 let applied = query_applied_migrations(&mut transaction, query)?;
55 transaction.commit()?;
56 Ok(applied)
57 }
58}
59
60impl Migrate for PgClient {}