tank_postgres/
connection.rs

1use crate::{
2    PostgresDriver, PostgresPrepared, PostgresTransaction, ValueHolder, util::row_to_tank_row,
3};
4use async_stream::try_stream;
5use std::{borrow::Cow, future, pin::pin, sync::Arc};
6use tank_core::{
7    Connection, Driver, Error, ErrorContext, Executor, Query, QueryResult, Result, RowLabeled,
8    future::Either,
9    printable_query,
10    stream::{self, Stream, StreamExt},
11};
12use tokio::spawn;
13use tokio_postgres::NoTls;
14
15pub struct PostgresConnection {
16    pub(crate) client: tokio_postgres::Client,
17    pub(crate) _transaction: bool,
18}
19
20impl Executor for PostgresConnection {
21    type Driver = PostgresDriver;
22
23    fn driver(&self) -> &Self::Driver {
24        &PostgresDriver {}
25    }
26
27    async fn prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
28        Ok(
29            PostgresPrepared::new(self.client.prepare(&sql).await.with_context(|| {
30                format!(
31                    "While preparing the query:\n{}",
32                    printable_query!(sql.as_str())
33                )
34            })?)
35            .into(),
36        )
37    }
38
39    fn run(
40        &mut self,
41        query: Query<Self::Driver>,
42    ) -> impl Stream<Item = Result<QueryResult>> + Send {
43        let context = Arc::new(format!("While executing the query:\n{}", query));
44        match query {
45            Query::Raw(sql) => Either::Left(try_stream! {
46                let stream = self
47                    .client
48                    .query_raw(&sql, Vec::<ValueHolder>::new())
49                    .await
50                    .context(context.clone())?;
51                let mut stream = pin!(stream);
52                if let Some(first) = stream.next().await {
53                    let labels = first?
54                        .columns()
55                        .iter()
56                        .map(|c| c.name().to_string())
57                        .collect::<tank_core::RowNames>();
58                    while let Some(value) = stream.next().await {
59                        yield RowLabeled {
60                            labels: labels.clone(),
61                            values: row_to_tank_row(value?).into(),
62                        }
63                        .into()
64                    }
65                }
66            }),
67            Query::Prepared(..) => Either::Right(stream::once(future::ready(Err(Error::msg(
68                "Cannot run a prepares statement without a transaction",
69            )
70            .context(context.clone()))))),
71        }
72    }
73}
74
75impl Connection for PostgresConnection {
76    #[allow(refining_impl_trait)]
77    async fn connect(url: Cow<'static, str>) -> Result<PostgresConnection> {
78        let prefix = format!("{}://", <Self::Driver as Driver>::NAME);
79        if !url.starts_with(&prefix) {
80            let error = Error::msg(format!(
81                "Postgres connection url must start with `{}`",
82                &prefix
83            ));
84            log::error!("{:#}", error);
85            return Err(error);
86        }
87        let (client, connection) = tokio_postgres::connect(&url, NoTls)
88            .await
89            .with_context(|| format!("While trying to connect to `{}`", url))?;
90        spawn(async move {
91            if let Err(e) = connection.await {
92                log::error!("Postgres connection error: {:#}", e);
93            }
94        });
95
96        Ok(Self {
97            client,
98            _transaction: false,
99        })
100    }
101
102    #[allow(refining_impl_trait)]
103    fn begin(&mut self) -> impl Future<Output = Result<PostgresTransaction<'_>>> {
104        PostgresTransaction::new(self)
105    }
106}