tank_postgres/
connection.rs

1use crate::{
2    PostgresDriver, PostgresPrepared, PostgresTransaction, ValueWrap,
3    util::{
4        stream_postgres_row_to_tank_row, stream_postgres_simple_query_message_to_tank_query_result,
5    },
6};
7use async_stream::try_stream;
8use std::{borrow::Cow, pin::pin, sync::Arc};
9use tank_core::{
10    Connection, Driver, Error, ErrorContext, Executor, Query, QueryResult, Result, Transaction,
11    future::Either,
12    stream::{Stream, StreamExt, TryStreamExt},
13    truncate_long,
14};
15use tokio::spawn;
16use tokio_postgres::NoTls;
17
18pub struct PostgresConnection {
19    pub(crate) client: tokio_postgres::Client,
20    pub(crate) _transaction: bool,
21}
22
23impl Executor for PostgresConnection {
24    type Driver = PostgresDriver;
25
26    fn driver(&self) -> &Self::Driver {
27        &PostgresDriver {}
28    }
29
30    async fn prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
31        let sql = sql.trim_end().trim_end_matches(';');
32        Ok(
33            PostgresPrepared::new(self.client.prepare(&sql).await.map_err(|e| {
34                let e = Error::new(e).context(format!(
35                    "While preparing the query:\n{}",
36                    truncate_long!(sql)
37                ));
38                log::error!("{:#}", e);
39                e
40            })?)
41            .into(),
42        )
43    }
44
45    fn run(
46        &mut self,
47        query: Query<Self::Driver>,
48    ) -> impl Stream<Item = Result<QueryResult>> + Send {
49        let context = Arc::new(format!("While running the query:\n{}", query));
50        match query {
51            Query::Raw(sql) => {
52                Either::Left(stream_postgres_simple_query_message_to_tank_query_result(
53                    async move || self.client.simple_query_raw(&sql).await.map_err(Into::into),
54                ))
55            }
56            Query::Prepared(..) => Either::Right(try_stream! {
57                let mut transaction = self.begin().await?;
58                {
59                    let mut stream = pin!(transaction.run(query));
60                    while let Some(value) = stream.next().await.transpose()? {
61                        yield value;
62                    }
63                }
64                transaction.commit().await?;
65            }),
66        }
67        .map_err(move |e: Error| {
68            let e = e.context(context.clone());
69            log::error!("{:#}", e);
70            e
71        })
72    }
73
74    fn fetch<'s>(
75        &'s mut self,
76        query: Query<Self::Driver>,
77    ) -> impl Stream<Item = Result<tank_core::RowLabeled>> + Send + 's {
78        let context = Arc::new(format!("While fetching the query:\n{}", query));
79        match query {
80            Query::Raw(sql) => Either::Left(stream_postgres_row_to_tank_row(async move || {
81                self.client
82                    .query_raw(&sql, Vec::<ValueWrap>::new())
83                    .await
84                    .map_err(|e| {
85                        let e = Error::new(e).context(context.clone());
86                        log::error!("{:#}", e);
87                        e
88                    })
89            })),
90            Query::Prepared(..) => Either::Right(
91                try_stream! {
92                    let mut transaction = self.begin().await?;
93                    {
94                        let mut stream = pin!(transaction.fetch(query));
95                        while let Some(value) = stream.next().await.transpose()? {
96                            yield value;
97                        }
98                    }
99                    transaction.commit().await?;
100                }
101                .map_err(move |e: Error| {
102                    let e = e.context(context.clone());
103                    log::error!("{:#}", e);
104                    e
105                }),
106            ),
107        }
108    }
109}
110
111impl Connection for PostgresConnection {
112    #[allow(refining_impl_trait)]
113    async fn connect(url: Cow<'static, str>) -> Result<PostgresConnection> {
114        let prefix = format!("{}://", <Self::Driver as Driver>::NAME);
115        if !url.starts_with(&prefix) {
116            let error = Error::msg(format!(
117                "Postgres connection url must start with `{}`",
118                &prefix
119            ));
120            log::error!("{:#}", error);
121            return Err(error);
122        }
123        let (client, connection) = tokio_postgres::connect(&url, NoTls)
124            .await
125            .with_context(|| format!("While trying to connect to `{}`", url))?;
126        spawn(async move {
127            if let Err(e) = connection.await {
128                log::error!("Postgres connection error: {:#}", e);
129            }
130        });
131
132        Ok(Self {
133            client,
134            _transaction: false,
135        })
136    }
137
138    #[allow(refining_impl_trait)]
139    fn begin(&mut self) -> impl Future<Output = Result<PostgresTransaction<'_>>> {
140        PostgresTransaction::new(self)
141    }
142}