tank_postgres/
transaction.rs1use std::mem;
2
3use crate::{
4 PostgresConnection, PostgresDriver, PostgresPrepared, ValueWrap,
5 util::stream_postgres_row_to_tank_row,
6};
7use tank_core::{
8 AsQuery, Error, Executor, Query, QueryResult, Result, Transaction,
9 future::{Either, TryFutureExt},
10 stream::{Stream, TryStreamExt},
11};
12
13pub struct PostgresTransaction<'c>(pub(crate) tokio_postgres::Transaction<'c>);
14
15impl<'c> PostgresTransaction<'c> {
16 pub async fn new(client: &'c mut PostgresConnection) -> Result<Self> {
17 Ok(Self(client.client.transaction().await.map_err(|e| {
18 log::error!("{:#}", e);
19 e
20 })?))
21 }
22}
23
24impl<'c> Executor for PostgresTransaction<'c> {
25 type Driver = PostgresDriver;
26 fn driver(&self) -> &Self::Driver {
27 &PostgresDriver {}
28 }
29 async fn prepare(&mut self, query: String) -> Result<Query<Self::Driver>> {
30 Ok(
31 PostgresPrepared::new(self.0.prepare(&query).await.map_err(|e| {
32 let error = Error::new(e);
33 log::error!("{:#}", error);
34 error
35 })?)
36 .into(),
37 )
38 }
39 fn run<'s>(
40 &'s mut self,
41 query: impl AsQuery<Self::Driver> + 's,
42 ) -> impl Stream<Item = Result<QueryResult>> + Send {
43 let mut query = query.as_query();
44 let mut owned = mem::take(query.as_mut());
45 stream_postgres_row_to_tank_row(async move || match &mut owned {
46 Query::Raw(sql) => {
47 let stream = self.0.query_raw(sql, Vec::<ValueWrap>::new()).await?;
48 Ok(Either::Left(stream))
49 }
50 Query::Prepared(prepared) => {
51 let portal = if !prepared.is_complete() {
52 prepared.complete(self).await?
53 } else {
54 prepared.get_portal().ok_or(Error::msg(format!(
55 "The prepared statement `{}` is not complete",
56 prepared
57 )))?
58 };
59 Ok(Either::Right(self.0.query_portal_raw(&portal, 0).await?))
60 }
61 })
62 .map_err(|e| {
63 log::error!("{:#}", e);
64 e
65 })
66 }
67}
68
69impl<'c> Transaction<'c> for PostgresTransaction<'c> {
70 fn commit(self) -> impl Future<Output = Result<()>> {
71 self.0.commit().map_err(|e| {
72 let e = Error::new(e);
73 log::error!("{:#}", e);
74 e
75 })
76 }
77 fn rollback(self) -> impl Future<Output = Result<()>> {
78 self.0.rollback().map_err(|e| {
79 let e = Error::new(e);
80 log::error!("{:#}", e);
81 e
82 })
83 }
84}