1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; use crate::Migration; use async_trait::async_trait; use chrono::{DateTime, Local}; use tokio_postgres::error::Error as PgError; use tokio_postgres::{Client, Transaction as PgTransaction}; async fn query_applied_migrations( transaction: &PgTransaction<'_>, query: &str, ) -> Result<Vec<Migration>, PgError> { let rows = transaction.query(query, &[]).await?; let mut applied = Vec::new(); for row in rows.into_iter() { let version = row.get(0); let applied_on: String = row.get(2); let applied_on = DateTime::parse_from_rfc3339(&applied_on) .unwrap() .with_timezone(&Local); let checksum: String = row.get(3); applied.push(Migration::applied( version, row.get(1), applied_on, checksum .parse::<u64>() .expect("checksum must be a valid u64"), )); } Ok(applied) } #[async_trait] impl AsyncTransaction for Client { type Error = PgError; async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> { let transaction = self.transaction().await?; let mut count = 0; for query in queries { transaction.batch_execute(*query).await?; count += 1; } transaction.commit().await?; Ok(count as usize) } } #[async_trait] impl AsyncQuery<Vec<Migration>> for Client { async fn query( &mut self, query: &str, ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> { let transaction = self.transaction().await?; let applied = query_applied_migrations(&transaction, query).await?; transaction.commit().await?; Ok(applied) } } impl AsyncMigrate for Client {}