tank_postgres/
connection.rs1use crate::{
2 PostgresDriver, PostgresPrepared, PostgresTransaction, ValueWrap,
3 util::{
4 stream_postgres_row_to_tank_row, stream_postgres_simple_query_message_to_tank_query_result,
5 },
6};
7use async_stream::try_stream;
8use std::{borrow::Cow, pin::pin, sync::Arc};
9use tank_core::{
10 Connection, Driver, Error, ErrorContext, Executor, Query, QueryResult, Result, Transaction,
11 future::Either,
12 stream::{Stream, StreamExt, TryStreamExt},
13 truncate_long,
14};
15use tokio::spawn;
16use tokio_postgres::NoTls;
17
18pub struct PostgresConnection {
19 pub(crate) client: tokio_postgres::Client,
20 pub(crate) _transaction: bool,
21}
22
23impl Executor for PostgresConnection {
24 type Driver = PostgresDriver;
25
26 fn driver(&self) -> &Self::Driver {
27 &PostgresDriver {}
28 }
29
30 async fn prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
31 let sql = sql.trim_end().trim_end_matches(';');
32 Ok(
33 PostgresPrepared::new(self.client.prepare(&sql).await.map_err(|e| {
34 let e = Error::new(e).context(format!(
35 "While preparing the query:\n{}",
36 truncate_long!(sql)
37 ));
38 log::error!("{:#}", e);
39 e
40 })?)
41 .into(),
42 )
43 }
44
45 fn run(
46 &mut self,
47 query: Query<Self::Driver>,
48 ) -> impl Stream<Item = Result<QueryResult>> + Send {
49 let context = Arc::new(format!("While running the query:\n{}", query));
50 match query {
51 Query::Raw(sql) => {
52 Either::Left(stream_postgres_simple_query_message_to_tank_query_result(
53 async move || self.client.simple_query_raw(&sql).await.map_err(Into::into),
54 ))
55 }
56 Query::Prepared(..) => Either::Right(try_stream! {
57 let mut transaction = self.begin().await?;
58 {
59 let mut stream = pin!(transaction.run(query));
60 while let Some(value) = stream.next().await.transpose()? {
61 yield value;
62 }
63 }
64 transaction.commit().await?;
65 }),
66 }
67 .map_err(move |e: Error| {
68 let e = e.context(context.clone());
69 log::error!("{:#}", e);
70 e
71 })
72 }
73
74 fn fetch<'s>(
75 &'s mut self,
76 query: Query<Self::Driver>,
77 ) -> impl Stream<Item = Result<tank_core::RowLabeled>> + Send + 's {
78 let context = Arc::new(format!("While fetching the query:\n{}", query));
79 match query {
80 Query::Raw(sql) => Either::Left(stream_postgres_row_to_tank_row(async move || {
81 self.client
82 .query_raw(&sql, Vec::<ValueWrap>::new())
83 .await
84 .map_err(|e| {
85 let e = Error::new(e).context(context.clone());
86 log::error!("{:#}", e);
87 e
88 })
89 })),
90 Query::Prepared(..) => Either::Right(
91 try_stream! {
92 let mut transaction = self.begin().await?;
93 {
94 let mut stream = pin!(transaction.fetch(query));
95 while let Some(value) = stream.next().await.transpose()? {
96 yield value;
97 }
98 }
99 transaction.commit().await?;
100 }
101 .map_err(move |e: Error| {
102 let e = e.context(context.clone());
103 log::error!("{:#}", e);
104 e
105 }),
106 ),
107 }
108 }
109}
110
111impl Connection for PostgresConnection {
112 #[allow(refining_impl_trait)]
113 async fn connect(url: Cow<'static, str>) -> Result<PostgresConnection> {
114 let prefix = format!("{}://", <Self::Driver as Driver>::NAME);
115 if !url.starts_with(&prefix) {
116 let error = Error::msg(format!(
117 "Postgres connection url must start with `{}`",
118 &prefix
119 ));
120 log::error!("{:#}", error);
121 return Err(error);
122 }
123 let (client, connection) = tokio_postgres::connect(&url, NoTls)
124 .await
125 .with_context(|| format!("While trying to connect to `{}`", url))?;
126 spawn(async move {
127 if let Err(e) = connection.await {
128 log::error!("Postgres connection error: {:#}", e);
129 }
130 });
131
132 Ok(Self {
133 client,
134 _transaction: false,
135 })
136 }
137
138 #[allow(refining_impl_trait)]
139 fn begin(&mut self) -> impl Future<Output = Result<PostgresTransaction<'_>>> {
140 PostgresTransaction::new(self)
141 }
142}