Skip to main content

tank_postgres/
transaction.rs

1use crate::{
2    PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3    util::stream_postgres_row_to_tank_row,
4};
5use tank_core::{
6    AsQuery, Error, Executor, Query, QueryResult, RawQuery, Result, Transaction,
7    future::{Either, TryFutureExt},
8    stream::{Stream, TryStreamExt},
9};
10
11/// Postgres transaction.
12pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
13
14impl<'c> PostgresTransaction<'c> {
15    pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
16        Ok(Self(client.client.transaction().await.map_err(|e| {
17            log::error!("{:#}", e);
18            e
19        })?))
20    }
21}
22
23impl<'c> Executor for PostgresTransaction<'c> {
24    type Driver = PostgresDriver;
25
26    async fn do_prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
27        Ok(
28            PostgresPrepared::new(self.0.prepare(&sql).await.map_err(|e| {
29                let error = Error::new(e);
30                log::error!("{:#}", error);
31                error
32            })?)
33            .into(),
34        )
35    }
36    fn run<'s>(
37        &'s mut self,
38        query: impl AsQuery<Self::Driver> + 's,
39    ) -> impl Stream<Item = Result<QueryResult>> + Send {
40        let mut query = query.as_query();
41        stream_postgres_row_to_tank_row(async move || match query.as_mut() {
42            Query::Raw(RawQuery(sql)) => {
43                let stream = self
44                    .0
45                    .query_raw(sql.as_str(), Vec::<ValueWrap>::new())
46                    .await?;
47                Ok(Either::Left(stream))
48            }
49            Query::Prepared(prepared) => {
50                let params = prepared.take_params();
51                let portal = self
52                    .0
53                    .bind_raw(&prepared.statement, params.into_iter())
54                    .await?;
55                Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
56            }
57        })
58        .map_err(|e| {
59            log::error!("{:#}", e);
60            e
61        })
62    }
63}
64
65impl<'c> Transaction<'c> for PostgresTransaction<'c> {
66    fn commit(self) -> impl Future<Output = Result<()>> {
67        self.0.commit().map_err(|e| {
68            let e = Error::new(e);
69            log::error!("{:#}", e);
70            e
71        })
72    }
73
74    fn rollback(self) -> impl Future<Output = Result<()>> {
75        self.0.rollback().map_err(|e| {
76            let e = Error::new(e);
77            log::error!("{:#}", e);
78            e
79        })
80    }
81}