tank_postgres/
transaction.rs

1use crate::{
2    PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3    util::stream_postgres_row_to_tank_row,
4};
5use tank_core::{
6    Error, Executor, Query, QueryResult, Result, Transaction,
7    future::{Either, TryFutureExt},
8    stream::{Stream, TryStreamExt},
9};
10
11pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
12
13impl<'c> PostgresTransaction<'c> {
14    pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
15        Ok(Self(client.client.transaction().await.map_err(|e| {
16            log::error!("{:#}", e);
17            e
18        })?))
19    }
20}
21
22impl<'c> Executor for PostgresTransaction<'c> {
23    type Driver = PostgresDriver;
24    fn driver(&self) -> &Self::Driver {
25        &PostgresDriver {}
26    }
27    async fn prepare(&mut self, query: String) -> Result<Query<Self::Driver>> {
28        Ok(
29            PostgresPrepared::new(self.0.prepare(&query).await.map_err(|e| {
30                let e = Error::new(e);
31                log::error!("{:#}", e);
32                e
33            })?)
34            .into(),
35        )
36    }
37    fn run(
38        &mut self,
39        query: Query<Self::Driver>,
40    ) -> impl Stream<Item = Result<QueryResult>> + Send {
41        stream_postgres_row_to_tank_row(async move || match query {
42            Query::Raw(sql) => Ok(Either::Left(
43                self.0.query_raw(&sql, Vec::<ValueWrap>::new()).await?,
44            )),
45            Query::Prepared(mut prepared) => {
46                let portal = if !prepared.is_complete() {
47                    prepared.complete(self).await?
48                } else {
49                    prepared.get_portal().ok_or(Error::msg(format!(
50                        "The prepared statement `{}` is not complete",
51                        prepared
52                    )))?
53                };
54                Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
55            }
56        })
57        .map_err(|e| {
58            log::error!("{:#}", e);
59            e
60        })
61    }
62}
63
64impl<'c> Transaction<'c> for PostgresTransaction<'c> {
65    fn commit(self) -> impl Future<Output = Result<()>> {
66        self.0.commit().map_err(|e| {
67            let e = Error::new(e);
68            log::error!("{:#}", e);
69            e
70        })
71    }
72    fn rollback(self) -> impl Future<Output = Result<()>> {
73        self.0.rollback().map_err(|e| {
74            let e = Error::new(e);
75            log::error!("{:#}", e);
76            e
77        })
78    }
79}