refinery_core/drivers/
postgres.rs

1use crate::traits::sync::{Migrate, Query, Transaction};
2use crate::Migration;
3use postgres::{Client as PgClient, Error as PgError, Transaction as PgTransaction};
4use time::format_description::well_known::Rfc3339;
5use time::OffsetDateTime;
6
7fn query_applied_migrations(
8    transaction: &mut PgTransaction,
9    query: &str,
10) -> Result<Vec<Migration>, PgError> {
11    let rows = transaction.query(query, &[])?;
12    let mut applied = Vec::new();
13    for row in rows.into_iter() {
14        let version = row.get(0);
15        let applied_on: String = row.get(2);
16        // Safe to call unwrap, as we stored it in RFC3339 format on the database
17        let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
18
19        let checksum: String = row.get(3);
20
21        applied.push(Migration::applied(
22            version,
23            row.get(1),
24            applied_on,
25            checksum
26                .parse::<u64>()
27                .expect("checksum must be a valid u64"),
28        ));
29    }
30    Ok(applied)
31}
32
33impl Transaction for PgClient {
34    type Error = PgError;
35
36    fn execute<'a, T: Iterator<Item = &'a str>>(
37        &mut self,
38        queries: T,
39    ) -> Result<usize, Self::Error> {
40        let mut transaction = PgClient::transaction(self)?;
41        let mut count = 0;
42        for query in queries {
43            PgTransaction::batch_execute(&mut transaction, query)?;
44            count += 1;
45        }
46        transaction.commit()?;
47        Ok(count as usize)
48    }
49}
50
51impl Query<Vec<Migration>> for PgClient {
52    fn query(&mut self, query: &str) -> Result<Vec<Migration>, Self::Error> {
53        let mut transaction = PgClient::transaction(self)?;
54        let applied = query_applied_migrations(&mut transaction, query)?;
55        transaction.commit()?;
56        Ok(applied)
57    }
58}
59
60impl Migrate for PgClient {}