use crate::adapter::DbAdapterError;
use sqlx::postgres::PgPool;
use std::collections::BTreeMap;
use std::future::Future;
use argentum_log_business::LoggerTrait;
use sqlx::query::{Query, QueryAs};
use sqlx::{Error, Execute, Executor, FromRow, Postgres, Transaction};
use sqlx_postgres::{PgArguments, PgRow};
use std::sync::Arc;
pub struct DbRow<'r> {
pub data: BTreeMap<&'r str, &'r [u8]>,
}
pub trait FromDbRow: Sized {
fn from_db_row(row: DbRow) -> Result<Self, Error>;
}
pub struct SqlxPostgresAdapter<L>
where
L: LoggerTrait,
{
pub pool: Arc<PgPool>,
logger: Arc<L>,
}
impl<L> SqlxPostgresAdapter<L>
where
L: LoggerTrait,
{
pub fn new(pool: Arc<PgPool>, logger: Arc<L>) -> Self {
Self { pool, logger }
}
pub fn exec<'q>(
&'q self,
query: Query<'q, Postgres, PgArguments>,
) -> impl Future<Output = Result<u64, DbAdapterError>> + Send + 'q {
self.exec_with_executor(query, &*self.pool)
}
pub async fn exec_with_executor<'q, E>(
&'q self,
query: Query<'q, Postgres, PgArguments>,
executor: E,
) -> Result<u64, DbAdapterError>
where
E: Executor<'q, Database = Postgres> + 'q,
{
let sql = query.sql().to_string();
self.logger.debug(sql.clone());
let result = query.execute(executor).await;
self.logger.trace("done");
match result {
Ok(r) => Ok(r.rows_affected()),
Err(e) => Err(DbAdapterError {
msg: e.to_string(),
sql: Some(sql),
}),
}
}
pub async fn fetch_one<'q, F>(
&'q self,
query_as: QueryAs<'q, Postgres, F, PgArguments>,
) -> Result<Option<F>, DbAdapterError>
where
F: Send + Unpin + for<'r> FromRow<'r, PgRow> + 'q,
{
let sql = query_as.sql().to_string();
self.logger.debug(sql.clone());
let result: Result<Option<F>, Error> = query_as.fetch_optional(&*self.pool).await;
self.logger.trace("SQL done");
match result {
Ok(Some(r)) => Ok(Some(r)),
Ok(None) => Ok(None),
Err(e) => Err(DbAdapterError {
msg: e.to_string(),
sql: Some(sql),
}),
}
}
pub async fn begin_transaction(
&self,
) -> Result<Transaction<'static, Postgres>, DbAdapterError> {
self.logger.debug("Begin transaction");
match self.pool.begin().await {
Ok(tx) => Ok(tx),
Err(e) => Err(DbAdapterError {
msg: e.to_string(),
sql: None,
}),
}
}
pub async fn commit<'a>(&'a self, tx: Transaction<'a, Postgres>) -> Result<(), DbAdapterError> {
self.logger.debug("Commit transaction");
match tx.commit().await {
Ok(_) => Ok(()),
Err(e) => Err(DbAdapterError {
msg: e.to_string(),
sql: None,
}),
}
}
pub async fn rollback<'a>(
&'a self,
tx: Transaction<'a, Postgres>,
) -> Result<(), DbAdapterError> {
self.logger.debug("Rollback transaction");
match tx.rollback().await {
Ok(_) => Ok(()),
Err(e) => Err(DbAdapterError {
msg: e.to_string(),
sql: None,
}),
}
}
}