Skip to main content

tank_postgres/
transaction.rs

1use crate::{
2    PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3    util::stream_postgres_row_to_tank_row,
4};
5use tank_core::{
6    AsQuery, Error, Executor, Query, QueryResult, RawQuery, Result, Transaction,
7    future::{Either, TryFutureExt},
8    stream::{Stream, TryStreamExt},
9};
10
11/// Transaction wrapper for Postgres databases.
12///
13/// Implements `Executor` and `Transaction` to allow using a Postgres transaction via the `tank_core` abstractions.
14pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
15
16impl<'c> PostgresTransaction<'c> {
17    pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
18        Ok(Self(client.client.transaction().await.map_err(|e| {
19            log::error!("{:#}", e);
20            e
21        })?))
22    }
23}
24
25impl<'c> Executor for PostgresTransaction<'c> {
26    type Driver = PostgresDriver;
27
28    async fn do_prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
29        Ok(
30            PostgresPrepared::new(self.0.prepare(&sql).await.map_err(|e| {
31                let error = Error::new(e);
32                log::error!("{:#}", error);
33                error
34            })?)
35            .into(),
36        )
37    }
38    fn run<'s>(
39        &'s mut self,
40        query: impl AsQuery<Self::Driver> + 's,
41    ) -> impl Stream<Item = Result<QueryResult>> + Send {
42        let mut query = query.as_query();
43        stream_postgres_row_to_tank_row(async move || match query.as_mut() {
44            Query::Raw(RawQuery(sql)) => {
45                let stream = self
46                    .0
47                    .query_raw(sql.as_str(), Vec::<ValueWrap>::new())
48                    .await?;
49                Ok(Either::Left(stream))
50            }
51            Query::Prepared(prepared) => {
52                let params = prepared.take_params();
53                let portal = self
54                    .0
55                    .bind_raw(&prepared.statement, params.into_iter())
56                    .await?;
57                Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
58            }
59        })
60        .map_err(|e| {
61            log::error!("{:#}", e);
62            e
63        })
64    }
65}
66
67impl<'c> Transaction<'c> for PostgresTransaction<'c> {
68    fn commit(self) -> impl Future<Output = Result<()>> {
69        self.0.commit().map_err(|e| {
70            let e = Error::new(e);
71            log::error!("{:#}", e);
72            e
73        })
74    }
75
76    fn rollback(self) -> impl Future<Output = Result<()>> {
77        self.0.rollback().map_err(|e| {
78            let e = Error::new(e);
79            log::error!("{:#}", e);
80            e
81        })
82    }
83}