tank_postgres/
transaction.rs1use crate::{
2 PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
3 util::stream_postgres_row_to_tank_row,
4};
5use std::mem;
6use tank_core::{
7 AsQuery, Error, Executor, Query, QueryResult, Result, Transaction,
8 future::{Either, TryFutureExt},
9 stream::{Stream, TryStreamExt},
10};
11
12pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
13
14impl<'c> PostgresTransaction<'c> {
15 pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
16 Ok(Self(client.client.transaction().await.map_err(|e| {
17 log::error!("{:#}", e);
18 e
19 })?))
20 }
21}
22
23impl<'c> Executor for PostgresTransaction<'c> {
24 type Driver = PostgresDriver;
25 fn driver(&self) -> &Self::Driver {
26 &PostgresDriver {}
27 }
28 async fn prepare(&mut self, query: String) -> Result<Query<Self::Driver>> {
29 Ok(
30 PostgresPrepared::new(self.0.prepare(&query).await.map_err(|e| {
31 let error = Error::new(e);
32 log::error!("{:#}", error);
33 error
34 })?)
35 .into(),
36 )
37 }
38 fn run<'s>(
39 &'s mut self,
40 query: impl AsQuery<Self::Driver> + 's,
41 ) -> impl Stream<Item = Result<QueryResult>> + Send {
42 let mut query = query.as_query();
43 let mut owned = mem::take(query.as_mut());
44 stream_postgres_row_to_tank_row(async move || match &mut owned {
45 Query::Raw(sql) => {
46 let stream = self.0.query_raw(sql, Vec::<ValueWrap>::new()).await?;
47 Ok(Either::Left(stream))
48 }
49 Query::Prepared(prepared) => {
50 let params = prepared.take_params();
51 let portal = self
52 .0
53 .bind_raw(&prepared.statement, params.into_iter())
54 .await?;
55 Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
56 }
57 })
58 .map_err(|e| {
59 log::error!("{:#}", e);
60 e
61 })
62 }
63}
64
65impl<'c> Transaction<'c> for PostgresTransaction<'c> {
66 fn commit(self) -> impl Future<Output = Result<()>> {
67 self.0.commit().map_err(|e| {
68 let e = Error::new(e);
69 log::error!("{:#}", e);
70 e
71 })
72 }
73 fn rollback(self) -> impl Future<Output = Result<()>> {
74 self.0.rollback().map_err(|e| {
75 let e = Error::new(e);
76 log::error!("{:#}", e);
77 e
78 })
79 }
80}