use drizzle_core::error::DrizzleError;
use drizzle_core::traits::ToSQL;
use drizzle_postgres::builder::{DeleteInitial, InsertInitial, SelectInitial, UpdateInitial};
use drizzle_postgres::traits::PostgresTable;
use std::cell::RefCell;
use std::marker::PhantomData;
use tokio_postgres::{Row, Transaction as TokioPgTransaction};
pub mod delete;
pub mod insert;
pub mod select;
pub mod update;
use drizzle_postgres::builder::{
self, QueryBuilder, delete::DeleteBuilder, insert::InsertBuilder, select::SelectBuilder,
update::UpdateBuilder,
};
use drizzle_postgres::common::PostgresTransactionType;
use drizzle_postgres::values::PostgresValue;
#[derive(Debug)]
pub struct TransactionBuilder<'a, 'conn, Schema, Builder, State> {
transaction: &'a Transaction<'conn, Schema>,
builder: Builder,
_phantom: PhantomData<(Schema, State)>,
}
pub struct Transaction<'conn, Schema = ()> {
tx: RefCell<Option<TokioPgTransaction<'conn>>>,
tx_type: PostgresTransactionType,
_schema: PhantomData<Schema>,
}
impl<'conn, Schema> std::fmt::Debug for Transaction<'conn, Schema> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transaction")
.field("tx_type", &self.tx_type)
.field("is_active", &self.tx.borrow().is_some())
.finish()
}
}
impl<'conn, Schema> Transaction<'conn, Schema> {
pub(crate) fn new(tx: TokioPgTransaction<'conn>, tx_type: PostgresTransactionType) -> Self {
Self {
tx: RefCell::new(Some(tx)),
tx_type,
_schema: PhantomData,
}
}
#[inline]
pub fn tx_type(&self) -> PostgresTransactionType {
self.tx_type
}
pub fn select<'a, 'b, T>(
&'a self,
query: T,
) -> TransactionBuilder<
'a,
'conn,
Schema,
SelectBuilder<'b, Schema, SelectInitial>,
SelectInitial,
>
where
T: ToSQL<'b, PostgresValue<'b>>,
{
use drizzle_postgres::builder::QueryBuilder;
let builder = QueryBuilder::new::<Schema>().select(query);
TransactionBuilder {
transaction: self,
builder,
_phantom: PhantomData,
}
}
pub fn insert<'a, Table>(
&'a self,
table: Table,
) -> TransactionBuilder<
'a,
'conn,
Schema,
InsertBuilder<'a, Schema, InsertInitial, Table>,
InsertInitial,
>
where
Table: PostgresTable<'a>,
{
let builder = QueryBuilder::new::<Schema>().insert(table);
TransactionBuilder {
transaction: self,
builder,
_phantom: PhantomData,
}
}
pub fn update<'a, Table>(
&'a self,
table: Table,
) -> TransactionBuilder<
'a,
'conn,
Schema,
UpdateBuilder<'a, Schema, UpdateInitial, Table>,
UpdateInitial,
>
where
Table: PostgresTable<'a>,
{
let builder = QueryBuilder::new::<Schema>().update(table);
TransactionBuilder {
transaction: self,
builder,
_phantom: PhantomData,
}
}
pub fn delete<'a, T>(
&'a self,
table: T,
) -> TransactionBuilder<
'a,
'conn,
Schema,
DeleteBuilder<'a, Schema, DeleteInitial, T>,
DeleteInitial,
>
where
T: PostgresTable<'a>,
{
let builder = QueryBuilder::new::<Schema>().delete(table);
TransactionBuilder {
transaction: self,
builder,
_phantom: PhantomData,
}
}
pub fn with<'a, C>(
&'a self,
cte: C,
) -> TransactionBuilder<
'a,
'conn,
Schema,
QueryBuilder<'a, Schema, builder::CTEInit>,
builder::CTEInit,
>
where
C: builder::CTEDefinition<'a>,
{
let builder = QueryBuilder::new::<Schema>().with(cte);
TransactionBuilder {
transaction: self,
builder,
_phantom: PhantomData,
}
}
pub async fn execute<'a, T>(&'a self, query: T) -> Result<u64, tokio_postgres::Error>
where
T: ToSQL<'a, PostgresValue<'a>>,
{
let query_sql = query.to_sql();
let sql = query_sql.sql();
let params = query_sql.params();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
.map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let tx_ref = self.tx.borrow();
let tx = tx_ref.as_ref().expect("Transaction already consumed");
tx.execute(&sql, ¶m_refs[..]).await
}
pub async fn all<'a, T, R, C>(&'a self, query: T) -> drizzle_core::error::Result<C>
where
R: for<'r> TryFrom<&'r Row>,
for<'r> <R as TryFrom<&'r Row>>::Error: Into<drizzle_core::error::DrizzleError>,
T: ToSQL<'a, PostgresValue<'a>>,
C: std::iter::FromIterator<R>,
{
let sql = query.to_sql();
let sql_str = sql.sql();
let params = sql.params();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
.map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let tx_ref = self.tx.borrow();
let tx = tx_ref.as_ref().expect("Transaction already consumed");
let rows = tx
.query(&sql_str, ¶m_refs[..])
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))?;
let results = rows
.iter()
.map(|row| R::try_from(row).map_err(Into::into))
.collect::<Result<C, _>>()?;
Ok(results)
}
pub async fn get<'a, T, R>(&'a self, query: T) -> drizzle_core::error::Result<R>
where
R: for<'r> TryFrom<&'r Row>,
for<'r> <R as TryFrom<&'r Row>>::Error: Into<drizzle_core::error::DrizzleError>,
T: ToSQL<'a, PostgresValue<'a>>,
{
let sql = query.to_sql();
let sql_str = sql.sql();
let params = sql.params();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
.map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let tx_ref = self.tx.borrow();
let tx = tx_ref.as_ref().expect("Transaction already consumed");
let row = tx
.query_one(&sql_str, ¶m_refs[..])
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))?;
R::try_from(&row).map_err(Into::into)
}
pub(crate) async fn commit(&self) -> drizzle_core::error::Result<()> {
let tx = self
.tx
.borrow_mut()
.take()
.expect("Transaction already consumed");
tx.commit()
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))
}
pub(crate) async fn rollback(&self) -> drizzle_core::error::Result<()> {
let tx = self
.tx
.borrow_mut()
.take()
.expect("Transaction already consumed");
tx.rollback()
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))
}
}
impl<'a, 'conn, Schema>
TransactionBuilder<
'a,
'conn,
Schema,
QueryBuilder<'a, Schema, builder::CTEInit>,
builder::CTEInit,
>
{
#[inline]
pub fn select<T>(
self,
query: T,
) -> TransactionBuilder<
'a,
'conn,
Schema,
SelectBuilder<'a, Schema, SelectInitial>,
SelectInitial,
>
where
T: ToSQL<'a, PostgresValue<'a>>,
{
let builder = self.builder.select(query);
TransactionBuilder {
transaction: self.transaction,
builder,
_phantom: PhantomData,
}
}
#[inline]
pub fn with<C>(
self,
cte: C,
) -> TransactionBuilder<
'a,
'conn,
Schema,
QueryBuilder<'a, Schema, builder::CTEInit>,
builder::CTEInit,
>
where
C: builder::CTEDefinition<'a>,
{
let builder = self.builder.with(cte);
TransactionBuilder {
transaction: self.transaction,
builder,
_phantom: PhantomData,
}
}
}
impl<'a, 'conn, S, Schema, State, Table>
TransactionBuilder<'a, 'conn, S, QueryBuilder<'a, Schema, State, Table>, State>
where
State: builder::ExecutableState,
{
pub async fn execute(self) -> drizzle_core::error::Result<u64> {
let sql_str = self.builder.sql.sql();
let params = self.builder.sql.params();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
.map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let tx_ref = self.transaction.tx.borrow();
let tx = tx_ref.as_ref().expect("Transaction already consumed");
Ok(tx
.execute(&sql_str, ¶m_refs[..])
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))?)
}
pub async fn all<R, C>(self) -> drizzle_core::error::Result<C>
where
R: for<'r> TryFrom<&'r Row>,
for<'r> <R as TryFrom<&'r Row>>::Error: Into<drizzle_core::error::DrizzleError>,
C: FromIterator<R>,
{
let sql_str = self.builder.sql.sql();
let params = self.builder.sql.params();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
.map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let tx_ref = self.transaction.tx.borrow();
let tx = tx_ref.as_ref().expect("Transaction already consumed");
let rows = tx
.query(&sql_str, ¶m_refs[..])
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))?;
let results = rows
.iter()
.map(|row| R::try_from(row).map_err(Into::into))
.collect::<Result<C, _>>()?;
Ok(results)
}
pub async fn get<R>(self) -> drizzle_core::error::Result<R>
where
R: for<'r> TryFrom<&'r Row>,
for<'r> <R as TryFrom<&'r Row>>::Error: Into<drizzle_core::error::DrizzleError>,
{
let sql_str = self.builder.sql.sql();
let params = self.builder.sql.params();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
.map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let tx_ref = self.transaction.tx.borrow();
let tx = tx_ref.as_ref().expect("Transaction already consumed");
let row = tx
.query_one(&sql_str, ¶m_refs[..])
.await
.map_err(|e| DrizzleError::Other(e.to_string().into()))?;
R::try_from(&row).map_err(Into::into)
}
}
impl<'a, 'conn, S, T, State> ToSQL<'a, PostgresValue<'a>>
for TransactionBuilder<'a, 'conn, S, T, State>
where
T: ToSQL<'a, PostgresValue<'a>>,
{
fn to_sql(&self) -> drizzle_core::sql::SQL<'a, PostgresValue<'a>> {
self.builder.to_sql()
}
}