use crate::core::condition::SqlValue;
use crate::core::query::QueryBuilder;
use crate::core::sqlx::pg as sqlx_pg;
use crate::orm::postgres::model::PgModel;
pub struct TxCtx<'c> {
inner: sqlx::Transaction<'c, sqlx::Postgres>,
}
type ModelQuery<M> = crate::orm::model_query::ModelQuery<M>;
impl<'c> TxCtx<'c> {
fn new(inner: sqlx::Transaction<'c, sqlx::Postgres>) -> Self {
Self { inner }
}
pub async fn commit(self) -> Result<(), sqlx::Error> {
self.inner.commit().await
}
pub async fn rollback(self) -> Result<(), sqlx::Error> {
self.inner.rollback().await
}
pub async fn savepoint(&mut self, name: &str) -> Result<(), sqlx::Error> {
let sql = format!("SAVEPOINT \"{name}\"");
sqlx::query(&sql).execute(&mut *self.inner).await?;
Ok(())
}
pub async fn rollback_to(&mut self, name: &str) -> Result<(), sqlx::Error> {
let sql = format!("ROLLBACK TO SAVEPOINT \"{name}\"");
sqlx::query(&sql).execute(&mut *self.inner).await?;
Ok(())
}
pub async fn release(&mut self, name: &str) -> Result<(), sqlx::Error> {
let sql = format!("RELEASE SAVEPOINT \"{name}\"");
sqlx::query(&sql).execute(&mut *self.inner).await?;
Ok(())
}
pub async fn fetch_all<T>(&mut self, query: ModelQuery<T>) -> Result<Vec<T>, sqlx::Error>
where
T: PgModel + Sync,
{
let builder = query.into_final_builder();
let (sql, params) = builder.to_sql();
sqlx_pg::build_query_as::<T>(&sql, params)
.fetch_all(&mut *self.inner)
.await
}
pub async fn fetch_optional<T>(
&mut self,
query: ModelQuery<T>,
) -> Result<Option<T>, sqlx::Error>
where
T: PgModel + Sync,
{
let builder = query.into_final_builder();
let (sql, params) = builder.to_sql();
sqlx_pg::build_query_as::<T>(&sql, params)
.fetch_optional(&mut *self.inner)
.await
}
pub async fn create<T>(&mut self, data: &[(&str, SqlValue)]) -> Result<u64, sqlx::Error>
where
T: PgModel + Sync,
{
let (sql, params) = QueryBuilder::<T>::insert_sql(T::table_name(), data);
let result = sqlx_pg::build_query(&sql, params)
.execute(&mut *self.inner)
.await?;
Ok(result.rows_affected())
}
pub async fn create_returning<T>(&mut self, data: &[(&str, SqlValue)]) -> Result<T, sqlx::Error>
where
T: PgModel + Sync,
{
let (base_sql, params) = QueryBuilder::<T>::insert_sql(T::table_name(), data);
let sql = format!("{base_sql} RETURNING *");
sqlx_pg::build_query_as::<T>(&sql, params)
.fetch_optional(&mut *self.inner)
.await?
.ok_or(sqlx::Error::RowNotFound)
}
pub async fn update<T>(
&mut self,
query: ModelQuery<T>,
data: &[(&str, SqlValue)],
) -> Result<u64, sqlx::Error>
where
T: PgModel + Sync,
{
let builder = query.into_final_builder();
let (sql, params) = builder.to_update_sql(data);
let result = sqlx_pg::build_query(&sql, params)
.execute(&mut *self.inner)
.await?;
Ok(result.rows_affected())
}
pub async fn update_by_pk<T>(
&mut self,
id: impl Into<SqlValue> + Send,
data: &[(&str, SqlValue)],
) -> Result<u64, sqlx::Error>
where
T: PgModel + Sync,
{
let query = T::find_query(id.into());
self.update::<T>(query, data).await
}
pub async fn delete<T>(&mut self, query: ModelQuery<T>) -> Result<u64, sqlx::Error>
where
T: PgModel + Sync,
{
let builder = query.into_final_builder();
let (sql, params) = builder.to_delete_sql();
let result = sqlx_pg::build_query(&sql, params)
.execute(&mut *self.inner)
.await?;
Ok(result.rows_affected())
}
pub async fn delete_by_pk<T>(
&mut self,
id: impl Into<SqlValue> + Send,
) -> Result<u64, sqlx::Error>
where
T: PgModel + Sync,
{
let query = T::find_query(id.into());
self.delete::<T>(query).await
}
}
pub struct TransactionService;
impl TransactionService {
pub async fn begin(pool: &sqlx::PgPool) -> Result<TxCtx<'_>, sqlx::Error> {
let inner = pool.begin().await?;
Ok(TxCtx::new(inner))
}
}