refinery_core/drivers/
tokio_postgres.rs

1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use time::format_description::well_known::Rfc3339;
5use time::OffsetDateTime;
6use tokio_postgres::error::Error as PgError;
7use tokio_postgres::{Client, Transaction as PgTransaction};
8
9async fn query_applied_migrations(
10    transaction: &PgTransaction<'_>,
11    query: &str,
12) -> Result<Vec<Migration>, PgError> {
13    let rows = transaction.query(query, &[]).await?;
14    let mut applied = Vec::new();
15    for row in rows.into_iter() {
16        let version = row.get(0);
17        let applied_on: String = row.get(2);
18        // Safe to call unwrap, as we stored it in RFC3339 format on the database
19        let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
20        let checksum: String = row.get(3);
21
22        applied.push(Migration::applied(
23            version,
24            row.get(1),
25            applied_on,
26            checksum
27                .parse::<u64>()
28                .expect("checksum must be a valid u64"),
29        ));
30    }
31    Ok(applied)
32}
33
34#[async_trait]
35impl AsyncTransaction for Client {
36    type Error = PgError;
37
38    async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
39        let transaction = self.transaction().await?;
40        let mut count = 0;
41        for query in queries {
42            transaction.batch_execute(query).await?;
43            count += 1;
44        }
45        transaction.commit().await?;
46        Ok(count as usize)
47    }
48}
49
50#[async_trait]
51impl AsyncQuery<Vec<Migration>> for Client {
52    async fn query(
53        &mut self,
54        query: &str,
55    ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
56        let transaction = self.transaction().await?;
57        let applied = query_applied_migrations(&transaction, query).await?;
58        transaction.commit().await?;
59        Ok(applied)
60    }
61}
62
63impl AsyncMigrate for Client {}