tank_postgres/
transaction.rs1use crate::{
2 PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3 util::stream_postgres_row_to_tank_row,
4};
5use std::mem;
6use tank_core::{
7 AsQuery, Error, Executor, Query, QueryResult, Result, Transaction,
8 future::{Either, TryFutureExt},
9 stream::{Stream, TryStreamExt},
10};
11
12pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
16
17impl<'c> PostgresTransaction<'c> {
18 pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
19 Ok(Self(client.client.transaction().await.map_err(|e| {
20 log::error!("{:#}", e);
21 e
22 })?))
23 }
24}
25
26impl<'c> Executor for PostgresTransaction<'c> {
27 type Driver = PostgresDriver;
28
29 async fn prepare(&mut self, query: String) -> Result<Query<Self::Driver>> {
30 Ok(
31 PostgresPrepared::new(self.0.prepare(&query).await.map_err(|e| {
32 let error = Error::new(e);
33 log::error!("{:#}", error);
34 error
35 })?)
36 .into(),
37 )
38 }
39 fn run<'s>(
40 &'s mut self,
41 query: impl AsQuery<Self::Driver> + 's,
42 ) -> impl Stream<Item = Result<QueryResult>> + Send {
43 let mut query = query.as_query();
44 let mut owned = mem::take(query.as_mut());
45 stream_postgres_row_to_tank_row(async move || match &mut owned {
46 Query::Raw(sql) => {
47 let stream = self.0.query_raw(sql, Vec::<ValueWrap>::new()).await?;
48 Ok(Either::Left(stream))
49 }
50 Query::Prepared(prepared) => {
51 let params = prepared.take_params();
52 let portal = self
53 .0
54 .bind_raw(&prepared.statement, params.into_iter())
55 .await?;
56 Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
57 }
58 })
59 .map_err(|e| {
60 log::error!("{:#}", e);
61 e
62 })
63 }
64}
65
66impl<'c> Transaction<'c> for PostgresTransaction<'c> {
67 fn commit(self) -> impl Future<Output = Result<()>> {
68 self.0.commit().map_err(|e| {
69 let e = Error::new(e);
70 log::error!("{:#}", e);
71 e
72 })
73 }
74
75 fn rollback(self) -> impl Future<Output = Result<()>> {
76 self.0.rollback().map_err(|e| {
77 let e = Error::new(e);
78 log::error!("{:#}", e);
79 e
80 })
81 }
82}