use crate::{
CancelToken, CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement,
};
use tokio::runtime::Runtime;
use tokio_postgres::types::{ToSql, Type};
use tokio_postgres::{Error, Row, SimpleQueryMessage};
pub struct Transaction<'a> {
runtime: &'a mut Runtime,
transaction: tokio_postgres::Transaction<'a>,
}
impl<'a> Transaction<'a> {
pub(crate) fn new(
runtime: &'a mut Runtime,
transaction: tokio_postgres::Transaction<'a>,
) -> Transaction<'a> {
Transaction {
runtime,
transaction,
}
}
fn rt(&mut self) -> Rt<'_> {
Rt(self.runtime)
}
pub fn commit(self) -> Result<(), Error> {
self.runtime.block_on(self.transaction.commit())
}
pub fn rollback(self) -> Result<(), Error> {
self.runtime.block_on(self.transaction.rollback())
}
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
self.runtime.block_on(self.transaction.prepare(query))
}
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
self.runtime
.block_on(self.transaction.prepare_typed(query, types))
}
pub fn execute<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>
where
T: ?Sized + ToStatement,
{
self.runtime
.block_on(self.transaction.execute(query, params))
}
pub fn query<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.runtime.block_on(self.transaction.query(query, params))
}
pub fn query_one<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error>
where
T: ?Sized + ToStatement,
{
self.runtime
.block_on(self.transaction.query_one(query, params))
}
pub fn query_opt<T>(
&mut self,
query: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<Option<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.runtime
.block_on(self.transaction.query_opt(query, params))
}
pub fn query_raw<'b, T, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'b dyn ToSql>,
I::IntoIter: ExactSizeIterator,
{
let stream = self
.runtime
.block_on(self.transaction.query_raw(query, params))?;
Ok(RowIter::new(self.rt(), stream))
}
pub fn bind<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Portal, Error>
where
T: ?Sized + ToStatement,
{
self.runtime.block_on(self.transaction.bind(query, params))
}
pub fn query_portal(&mut self, portal: &Portal, max_rows: i32) -> Result<Vec<Row>, Error> {
self.runtime
.block_on(self.transaction.query_portal(portal, max_rows))
}
pub fn query_portal_raw(
&mut self,
portal: &Portal,
max_rows: i32,
) -> Result<RowIter<'_>, Error> {
let stream = self
.runtime
.block_on(self.transaction.query_portal_raw(portal, max_rows))?;
Ok(RowIter::new(self.rt(), stream))
}
pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
where
T: ?Sized + ToStatement,
{
let sink = self.runtime.block_on(self.transaction.copy_in(query))?;
Ok(CopyInWriter::new(self.rt(), sink))
}
pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
where
T: ?Sized + ToStatement,
{
let stream = self.runtime.block_on(self.transaction.copy_out(query))?;
Ok(CopyOutReader::new(self.rt(), stream))
}
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
self.runtime.block_on(self.transaction.simple_query(query))
}
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
self.runtime.block_on(self.transaction.batch_execute(query))
}
pub fn cancel_token(&self) -> CancelToken {
CancelToken::new(self.transaction.cancel_token())
}
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
let transaction = self.runtime.block_on(self.transaction.transaction())?;
Ok(Transaction {
runtime: self.runtime,
transaction,
})
}
}