tank_postgres/
transaction.rs

1use std::mem;
2
3use crate::{
4    PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
5    util::stream_postgres_row_to_tank_row,
6};
7use tank_core::{
8    AsQuery, Error, Executor, Query, QueryResult, Result, Transaction,
9    future::{Either, TryFutureExt},
10    stream::{Stream, TryStreamExt},
11};
12
13pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
14
15impl<'c> PostgresTransaction<'c> {
16    pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
17        Ok(Self(client.client.transaction().await.map_err(|e| {
18            log::error!("{:#}", e);
19            e
20        })?))
21    }
22}
23
24impl<'c> Executor for PostgresTransaction<'c> {
25    type Driver = PostgresDriver;
26    fn driver(&self) -> &Self::Driver {
27        &PostgresDriver {}
28    }
29    async fn prepare(&mut self, query: String) -> Result<Query<Self::Driver>> {
30        Ok(
31            PostgresPrepared::new(self.0.prepare(&query).await.map_err(|e| {
32                let error = Error::new(e);
33                log::error!("{:#}", error);
34                error
35            })?)
36            .into(),
37        )
38    }
39    fn run<'s>(
40        &'s mut self,
41        query: impl AsQuery<Self::Driver> + 's,
42    ) -> impl Stream<Item = Result<QueryResult>> + Send {
43        let mut query = query.as_query();
44        let mut owned = mem::take(query.as_mut());
45        stream_postgres_row_to_tank_row(async move || match &mut owned {
46            Query::Raw(sql) => {
47                let stream = self.0.query_raw(sql, Vec::<ValueWrap>::new()).await?;
48                Ok(Either::Left(stream))
49            }
50            Query::Prepared(prepared) => {
51                let portal = if !prepared.is_complete() {
52                    prepared.complete(self).await?
53                } else {
54                    prepared.get_portal().ok_or(Error::msg(format!(
55                        "The prepared statement `{}` is not complete",
56                        prepared
57                    )))?
58                };
59                Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
60            }
61        })
62        .map_err(|e| {
63            log::error!("{:#}", e);
64            e
65        })
66    }
67}
68
69impl<'c> Transaction<'c> for PostgresTransaction<'c> {
70    fn commit(self) -> impl Future<Output = Result<()>> {
71        self.0.commit().map_err(|e| {
72            let e = Error::new(e);
73            log::error!("{:#}", e);
74            e
75        })
76    }
77    fn rollback(self) -> impl Future<Output = Result<()>> {
78        self.0.rollback().map_err(|e| {
79            let e = Error::new(e);
80            log::error!("{:#}", e);
81            e
82        })
83    }
84}