tank_postgres/
connection.rs1use crate::{
2 PostgresDriver, PostgresPrepared, PostgresTransaction, ValueHolder, util::row_to_tank_row,
3};
4use async_stream::try_stream;
5use std::{borrow::Cow, future, pin::pin, sync::Arc};
6use tank_core::{
7 Connection, Driver, Error, ErrorContext, Executor, Query, QueryResult, Result, RowLabeled,
8 future::Either,
9 printable_query,
10 stream::{self, Stream, StreamExt},
11};
12use tokio::spawn;
13use tokio_postgres::NoTls;
14
15pub struct PostgresConnection {
16 pub(crate) client: tokio_postgres::Client,
17 pub(crate) _transaction: bool,
18}
19
20impl Executor for PostgresConnection {
21 type Driver = PostgresDriver;
22
23 fn driver(&self) -> &Self::Driver {
24 &PostgresDriver {}
25 }
26
27 async fn prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
28 Ok(
29 PostgresPrepared::new(self.client.prepare(&sql).await.with_context(|| {
30 format!(
31 "While preparing the query:\n{}",
32 printable_query!(sql.as_str())
33 )
34 })?)
35 .into(),
36 )
37 }
38
39 fn run(
40 &mut self,
41 query: Query<Self::Driver>,
42 ) -> impl Stream<Item = Result<QueryResult>> + Send {
43 let context = Arc::new(format!("While executing the query:\n{}", query));
44 match query {
45 Query::Raw(sql) => Either::Left(try_stream! {
46 let stream = self
47 .client
48 .query_raw(&sql, Vec::<ValueHolder>::new())
49 .await
50 .context(context.clone())?;
51 let mut stream = pin!(stream);
52 if let Some(first) = stream.next().await {
53 let labels = first?
54 .columns()
55 .iter()
56 .map(|c| c.name().to_string())
57 .collect::<tank_core::RowNames>();
58 while let Some(value) = stream.next().await {
59 yield RowLabeled {
60 labels: labels.clone(),
61 values: row_to_tank_row(value?).into(),
62 }
63 .into()
64 }
65 }
66 }),
67 Query::Prepared(..) => Either::Right(stream::once(future::ready(Err(Error::msg(
68 "Cannot run a prepares statement without a transaction",
69 )
70 .context(context.clone()))))),
71 }
72 }
73}
74
75impl Connection for PostgresConnection {
76 #[allow(refining_impl_trait)]
77 async fn connect(url: Cow<'static, str>) -> Result<PostgresConnection> {
78 let prefix = format!("{}://", <Self::Driver as Driver>::NAME);
79 if !url.starts_with(&prefix) {
80 let error = Error::msg(format!(
81 "Postgres connection url must start with `{}`",
82 &prefix
83 ));
84 log::error!("{:#}", error);
85 return Err(error);
86 }
87 let (client, connection) = tokio_postgres::connect(&url, NoTls)
88 .await
89 .with_context(|| format!("While trying to connect to `{}`", url))?;
90 spawn(async move {
91 if let Err(e) = connection.await {
92 log::error!("Postgres connection error: {:#}", e);
93 }
94 });
95
96 Ok(Self {
97 client,
98 _transaction: false,
99 })
100 }
101
102 #[allow(refining_impl_trait)]
103 fn begin(&mut self) -> impl Future<Output = Result<PostgresTransaction<'_>>> {
104 PostgresTransaction::new(self)
105 }
106}