refinery_core/drivers/
tokio_postgres.rs1use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction};
2use crate::Migration;
3use async_trait::async_trait;
4use time::format_description::well_known::Rfc3339;
5use time::OffsetDateTime;
6use tokio_postgres::error::Error as PgError;
7use tokio_postgres::{Client, Transaction as PgTransaction};
8
9async fn query_applied_migrations(
10 transaction: &PgTransaction<'_>,
11 query: &str,
12) -> Result<Vec<Migration>, PgError> {
13 let rows = transaction.query(query, &[]).await?;
14 let mut applied = Vec::new();
15 for row in rows.into_iter() {
16 let version = row.get(0);
17 let applied_on: String = row.get(2);
18 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339).unwrap();
20 let checksum: String = row.get(3);
21
22 applied.push(Migration::applied(
23 version,
24 row.get(1),
25 applied_on,
26 checksum
27 .parse::<u64>()
28 .expect("checksum must be a valid u64"),
29 ));
30 }
31 Ok(applied)
32}
33
34#[async_trait]
35impl AsyncTransaction for Client {
36 type Error = PgError;
37
38 async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
39 &mut self,
40 queries: T,
41 ) -> Result<usize, Self::Error> {
42 let transaction = self.transaction().await?;
43 let mut count = 0;
44 for query in queries {
45 transaction.batch_execute(query).await?;
46 count += 1;
47 }
48 transaction.commit().await?;
49 Ok(count as usize)
50 }
51}
52
53#[async_trait]
54impl AsyncQuery<Vec<Migration>> for Client {
55 async fn query(
56 &mut self,
57 query: &str,
58 ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
59 let transaction = self.transaction().await?;
60 let applied = query_applied_migrations(&transaction, query).await?;
61 transaction.commit().await?;
62 Ok(applied)
63 }
64}
65
66impl AsyncMigrate for Client {}