use std::sync::Arc;
use deadpool_postgres::Object;
use tokio_postgres::Row;
use tracing::{debug, trace};
use crate::error::PgResult;
use crate::statement::PreparedStatementCache;
pub struct PgConnection {
client: Object,
statement_cache: Arc<PreparedStatementCache>,
}
impl PgConnection {
pub(crate) fn new(client: Object, statement_cache: Arc<PreparedStatementCache>) -> Self {
Self {
client,
statement_cache,
}
}
pub async fn query(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Vec<Row>> {
trace!(sql = %sql, "Executing query");
let stmt = self
.statement_cache
.get_or_prepare(&self.client, sql)
.await?;
let rows = self.client.query(&stmt, params).await?;
Ok(rows)
}
pub async fn query_one(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Row> {
trace!(sql = %sql, "Executing query_one");
let stmt = self
.statement_cache
.get_or_prepare(&self.client, sql)
.await?;
let row = self.client.query_one(&stmt, params).await?;
Ok(row)
}
pub async fn query_opt(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Option<Row>> {
trace!(sql = %sql, "Executing query_opt");
let stmt = self
.statement_cache
.get_or_prepare(&self.client, sql)
.await?;
let row = self.client.query_opt(&stmt, params).await?;
Ok(row)
}
pub async fn execute(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<u64> {
trace!(sql = %sql, "Executing statement");
let stmt = self
.statement_cache
.get_or_prepare(&self.client, sql)
.await?;
let count = self.client.execute(&stmt, params).await?;
Ok(count)
}
pub async fn batch_execute(&self, sql: &str) -> PgResult<()> {
trace!(sql = %sql, "Executing batch");
self.client.batch_execute(sql).await?;
Ok(())
}
pub async fn transaction(&mut self) -> PgResult<PgTransaction<'_>> {
debug!("Beginning transaction");
let txn = self.client.transaction().await?;
Ok(PgTransaction {
txn,
statement_cache: self.statement_cache.clone(),
})
}
pub fn inner(&self) -> &Object {
&self.client
}
#[inline]
pub async fn query_cached(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Vec<Row>> {
self.query(sql, params).await
}
pub async fn query_raw(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Vec<Row>> {
trace!(sql = %sql, "Executing raw query (no statement cache)");
let rows = self.client.query(sql, params).await?;
Ok(rows)
}
pub async fn query_opt_raw(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Option<Row>> {
trace!(sql = %sql, "Executing raw query_opt (no statement cache)");
let row = self.client.query_opt(sql, params).await?;
Ok(row)
}
}
pub struct PgTransaction<'a> {
txn: deadpool_postgres::Transaction<'a>,
statement_cache: Arc<PreparedStatementCache>,
}
impl<'a> PgTransaction<'a> {
pub async fn query(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Vec<Row>> {
trace!(sql = %sql, "Executing query in transaction");
let stmt = self
.statement_cache
.get_or_prepare_in_txn(&self.txn, sql)
.await?;
let rows = self.txn.query(&stmt, params).await?;
Ok(rows)
}
pub async fn query_one(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Row> {
let stmt = self
.statement_cache
.get_or_prepare_in_txn(&self.txn, sql)
.await?;
let row = self.txn.query_one(&stmt, params).await?;
Ok(row)
}
pub async fn query_opt(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<Option<Row>> {
let stmt = self
.statement_cache
.get_or_prepare_in_txn(&self.txn, sql)
.await?;
let row = self.txn.query_opt(&stmt, params).await?;
Ok(row)
}
pub async fn execute(
&self,
sql: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
) -> PgResult<u64> {
let stmt = self
.statement_cache
.get_or_prepare_in_txn(&self.txn, sql)
.await?;
let count = self.txn.execute(&stmt, params).await?;
Ok(count)
}
pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
debug!(name = %name, "Creating savepoint");
self.txn
.batch_execute(&format!("SAVEPOINT {}", name))
.await?;
Ok(())
}
pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
debug!(name = %name, "Rolling back to savepoint");
self.txn
.batch_execute(&format!("ROLLBACK TO SAVEPOINT {}", name))
.await?;
Ok(())
}
pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
debug!(name = %name, "Releasing savepoint");
self.txn
.batch_execute(&format!("RELEASE SAVEPOINT {}", name))
.await?;
Ok(())
}
pub async fn commit(self) -> PgResult<()> {
debug!("Committing transaction");
self.txn.commit().await?;
Ok(())
}
pub async fn rollback(self) -> PgResult<()> {
debug!("Rolling back transaction");
self.txn.rollback().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
}