tank_postgres/
connection.rs

1use crate::{
2    PostgresDriver, PostgresPrepared, PostgresTransaction, ValueWrap,
3    util::{
4        stream_postgres_row_to_tank_row, stream_postgres_simple_query_message_to_tank_query_result,
5    },
6};
7use async_stream::try_stream;
8use std::{borrow::Cow, pin::pin, sync::Arc};
9use tank_core::{
10    Connection, Driver, Error, ErrorContext, Executor, Query, QueryResult, Result, Transaction,
11    future::Either,
12    stream::{Stream, StreamExt, TryStreamExt},
13    truncate_long,
14};
15use tokio::spawn;
16use tokio_postgres::NoTls;
17
18pub struct PostgresConnection {
19    pub(crate) client: tokio_postgres::Client,
20    pub(crate) _transaction: bool,
21}
22
23impl Executor for PostgresConnection {
24    type Driver = PostgresDriver;
25
26    fn driver(&self) -> &Self::Driver {
27        &PostgresDriver {}
28    }
29
30    async fn prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
31        let sql = sql.trim_end().trim_end_matches(';');
32        Ok(
33            PostgresPrepared::new(self.client.prepare(&sql).await.map_err(|e| {
34                let e = Error::new(e).context(format!(
35                    "While preparing the query:\n{}",
36                    truncate_long!(sql)
37                ));
38                log::error!("{:#}", e);
39                e
40            })?)
41            .into(),
42        )
43    }
44
45    fn run(
46        &mut self,
47        query: Query<Self::Driver>,
48    ) -> impl Stream<Item = Result<QueryResult>> + Send {
49        let context = Arc::new(format!("While running the query:\n{}", query));
50        match query {
51            Query::Raw(sql) => {
52                Either::Left(stream_postgres_simple_query_message_to_tank_query_result(
53                    async move || self.client.simple_query_raw(&sql).await.map_err(Into::into),
54                ))
55            }
56            Query::Prepared(..) => Either::Right(try_stream! {
57                let mut transaction = self.begin().await?;
58                {
59                    let stream = transaction.run(query);
60                    let mut stream = pin!(stream);
61                    while let Some(value) = stream.next().await.transpose()? {
62                        yield value;
63                    }
64                }
65                transaction.commit().await?;
66            }),
67        }
68        .map_err(move |e: Error| {
69            let e = e.context(context.clone());
70            log::error!("{:#}", e);
71            e
72        })
73    }
74
75    fn fetch<'s>(
76        &'s mut self,
77        query: Query<Self::Driver>,
78    ) -> impl Stream<Item = Result<tank_core::RowLabeled>> + Send + 's {
79        let context = Arc::new(format!("While fetching the query:\n{}", query));
80        match query {
81            Query::Raw(sql) => Either::Left(stream_postgres_row_to_tank_row(async move || {
82                self.client
83                    .query_raw(&sql, Vec::<ValueWrap>::new())
84                    .await
85                    .map_err(|e| {
86                        let e = Error::new(e).context(context.clone());
87                        log::error!("{:#}", e);
88                        e
89                    })
90            })),
91            Query::Prepared(..) => Either::Right(
92                try_stream! {
93                    let mut transaction = self.begin().await?;
94                    {
95                        let stream = transaction.fetch(query);
96                        let mut stream = pin!(stream);
97                        while let Some(value) = stream.next().await.transpose()? {
98                            yield value;
99                        }
100                    }
101                    transaction.commit().await?;
102                }
103                .map_err(move |e: Error| {
104                    let e = e.context(context.clone());
105                    log::error!("{:#}", e);
106                    e
107                }),
108            ),
109        }
110    }
111}
112
113impl Connection for PostgresConnection {
114    #[allow(refining_impl_trait)]
115    async fn connect(url: Cow<'static, str>) -> Result<PostgresConnection> {
116        let prefix = format!("{}://", <Self::Driver as Driver>::NAME);
117        if !url.starts_with(&prefix) {
118            let error = Error::msg(format!(
119                "Postgres connection url must start with `{}`",
120                &prefix
121            ));
122            log::error!("{:#}", error);
123            return Err(error);
124        }
125        let (client, connection) = tokio_postgres::connect(&url, NoTls)
126            .await
127            .with_context(|| format!("While trying to connect to `{}`", url))?;
128        spawn(async move {
129            if let Err(e) = connection.await {
130                log::error!("Postgres connection error: {:#}", e);
131            }
132        });
133
134        Ok(Self {
135            client,
136            _transaction: false,
137        })
138    }
139
140    #[allow(refining_impl_trait)]
141    fn begin(&mut self) -> impl Future<Output = Result<PostgresTransaction<'_>>> {
142        PostgresTransaction::new(self)
143    }
144}