refinery_core/drivers/
tokio_postgres.rs1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use time::format_description::well_known::Rfc3339;
5use time::OffsetDateTime;
6use tokio_postgres::error::Error as PgError;
7use tokio_postgres::{Client, Transaction as PgTransaction};
8
9async fn query_applied_migrations(
10 transaction: &PgTransaction<'_>,
11 query: &str,
12) -> Result<Vec<Migration>, PgError> {
13 let rows = transaction.query(query, &[]).await?;
14 let mut applied = Vec::new();
15 for row in rows.into_iter() {
16 let version = row.get(0);
17 let applied_on: String = row.get(2);
18 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
20 let checksum: String = row.get(3);
21
22 applied.push(Migration::applied(
23 version,
24 row.get(1),
25 applied_on,
26 checksum
27 .parse::<u64>()
28 .expect("checksum must be a valid u64"),
29 ));
30 }
31 Ok(applied)
32}
33
34#[async_trait]
35impl AsyncTransaction for Client {
36 type Error = PgError;
37
38 async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
39 let transaction = self.transaction().await?;
40 let mut count = 0;
41 for query in queries {
42 transaction.batch_execute(query).await?;
43 count += 1;
44 }
45 transaction.commit().await?;
46 Ok(count as usize)
47 }
48}
49
50#[async_trait]
51impl AsyncQuery<Vec<Migration>> for Client {
52 async fn query(
53 &mut self,
54 query: &str,
55 ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
56 let transaction = self.transaction().await?;
57 let applied = query_applied_migrations(&transaction, query).await?;
58 transaction.commit().await?;
59 Ok(applied)
60 }
61}
62
63impl AsyncMigrate for Client {}