tank_postgres/
transaction.rs1use crate::{
2 PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3 util::stream_postgres_row_to_tank_row,
4};
5use tank_core::{
6 AsQuery, Error, Executor, Query, QueryResult, RawQuery, Result, Transaction,
7 future::{Either, TryFutureExt},
8 stream::{Stream, TryStreamExt},
9};
10
11pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
15
16impl<'c> PostgresTransaction<'c> {
17 pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
18 Ok(Self(client.client.transaction().await.map_err(|e| {
19 log::error!("{:#}", e);
20 e
21 })?))
22 }
23}
24
25impl<'c> Executor for PostgresTransaction<'c> {
26 type Driver = PostgresDriver;
27
28 async fn do_prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
29 Ok(
30 PostgresPrepared::new(self.0.prepare(&sql).await.map_err(|e| {
31 let error = Error::new(e);
32 log::error!("{:#}", error);
33 error
34 })?)
35 .into(),
36 )
37 }
38 fn run<'s>(
39 &'s mut self,
40 query: impl AsQuery<Self::Driver> + 's,
41 ) -> impl Stream<Item = Result<QueryResult>> + Send {
42 let mut query = query.as_query();
43 stream_postgres_row_to_tank_row(async move || match query.as_mut() {
44 Query::Raw(RawQuery(sql)) => {
45 let stream = self
46 .0
47 .query_raw(sql.as_str(), Vec::<ValueWrap>::new())
48 .await?;
49 Ok(Either::Left(stream))
50 }
51 Query::Prepared(prepared) => {
52 let params = prepared.take_params();
53 let portal = self
54 .0
55 .bind_raw(&prepared.statement, params.into_iter())
56 .await?;
57 Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
58 }
59 })
60 .map_err(|e| {
61 log::error!("{:#}", e);
62 e
63 })
64 }
65}
66
67impl<'c> Transaction<'c> for PostgresTransaction<'c> {
68 fn commit(self) -> impl Future<Output = Result<()>> {
69 self.0.commit().map_err(|e| {
70 let e = Error::new(e);
71 log::error!("{:#}", e);
72 e
73 })
74 }
75
76 fn rollback(self) -> impl Future<Output = Result<()>> {
77 self.0.rollback().map_err(|e| {
78 let e = Error::new(e);
79 log::error!("{:#}", e);
80 e
81 })
82 }
83}