tank_postgres/
transaction.rs1use crate::{
2 PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3 util::stream_postgres_row_to_tank_row,
4};
5use tank_core::{
6 Error, Executor, Query, QueryResult, Result, Transaction,
7 future::{Either, TryFutureExt},
8 stream::{Stream, TryStreamExt},
9};
10
11pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
12
13impl<'c> PostgresTransaction<'c> {
14 pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
15 Ok(Self(client.client.transaction().await.map_err(|e| {
16 log::error!("{:#}", e);
17 e
18 })?))
19 }
20}
21
22impl<'c> Executor for PostgresTransaction<'c> {
23 type Driver = PostgresDriver;
24 fn driver(&self) -> &Self::Driver {
25 &PostgresDriver {}
26 }
27 async fn prepare(&mut self, query: String) -> Result<Query<Self::Driver>> {
28 Ok(
29 PostgresPrepared::new(self.0.prepare(&query).await.map_err(|e| {
30 let e = Error::new(e);
31 log::error!("{:#}", e);
32 e
33 })?)
34 .into(),
35 )
36 }
37 fn run(
38 &mut self,
39 query: Query<Self::Driver>,
40 ) -> impl Stream<Item = Result<QueryResult>> + Send {
41 stream_postgres_row_to_tank_row(async move || match query {
42 Query::Raw(sql) => Ok(Either::Left(
43 self.0.query_raw(&sql, Vec::<ValueWrap>::new()).await?,
44 )),
45 Query::Prepared(mut prepared) => {
46 let portal = if !prepared.is_complete() {
47 prepared.complete(self).await?
48 } else {
49 prepared.get_portal().ok_or(Error::msg(format!(
50 "The prepared statement `{}` is not complete",
51 prepared
52 )))?
53 };
54 Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
55 }
56 })
57 .map_err(|e| {
58 log::error!("{:#}", e);
59 e
60 })
61 }
62}
63
64impl<'c> Transaction<'c> for PostgresTransaction<'c> {
65 fn commit(self) -> impl Future<Output = Result<()>> {
66 self.0.commit().map_err(|e| {
67 let e = Error::new(e);
68 log::error!("{:#}", e);
69 e
70 })
71 }
72 fn rollback(self) -> impl Future<Output = Result<()>> {
73 self.0.rollback().map_err(|e| {
74 let e = Error::new(e);
75 log::error!("{:#}", e);
76 e
77 })
78 }
79}