tank_postgres/
connection.rs1use crate::{
2 PostgresDriver, PostgresPrepared, PostgresTransaction, ValueWrap,
3 util::{
4 stream_postgres_row_to_tank_row, stream_postgres_simple_query_message_to_tank_query_result,
5 },
6};
7use async_stream::try_stream;
8use std::{borrow::Cow, pin::pin, sync::Arc};
9use tank_core::{
10 Connection, Driver, Error, ErrorContext, Executor, Query, QueryResult, Result, Transaction,
11 future::Either,
12 stream::{Stream, StreamExt, TryStreamExt},
13 truncate_long,
14};
15use tokio::spawn;
16use tokio_postgres::NoTls;
17
18pub struct PostgresConnection {
19 pub(crate) client: tokio_postgres::Client,
20 pub(crate) _transaction: bool,
21}
22
23impl Executor for PostgresConnection {
24 type Driver = PostgresDriver;
25
26 fn driver(&self) -> &Self::Driver {
27 &PostgresDriver {}
28 }
29
30 async fn prepare(&mut self, sql: String) -> Result<Query<Self::Driver>> {
31 let sql = sql.trim_end().trim_end_matches(';');
32 Ok(
33 PostgresPrepared::new(self.client.prepare(&sql).await.map_err(|e| {
34 let e = Error::new(e).context(format!(
35 "While preparing the query:\n{}",
36 truncate_long!(sql)
37 ));
38 log::error!("{:#}", e);
39 e
40 })?)
41 .into(),
42 )
43 }
44
45 fn run(
46 &mut self,
47 query: Query<Self::Driver>,
48 ) -> impl Stream<Item = Result<QueryResult>> + Send {
49 let context = Arc::new(format!("While running the query:\n{}", query));
50 match query {
51 Query::Raw(sql) => {
52 Either::Left(stream_postgres_simple_query_message_to_tank_query_result(
53 async move || self.client.simple_query_raw(&sql).await.map_err(Into::into),
54 ))
55 }
56 Query::Prepared(..) => Either::Right(try_stream! {
57 let mut transaction = self.begin().await?;
58 {
59 let stream = transaction.run(query);
60 let mut stream = pin!(stream);
61 while let Some(value) = stream.next().await.transpose()? {
62 yield value;
63 }
64 }
65 transaction.commit().await?;
66 }),
67 }
68 .map_err(move |e: Error| {
69 let e = e.context(context.clone());
70 log::error!("{:#}", e);
71 e
72 })
73 }
74
75 fn fetch<'s>(
76 &'s mut self,
77 query: Query<Self::Driver>,
78 ) -> impl Stream<Item = Result<tank_core::RowLabeled>> + Send + 's {
79 let context = Arc::new(format!("While fetching the query:\n{}", query));
80 match query {
81 Query::Raw(sql) => Either::Left(stream_postgres_row_to_tank_row(async move || {
82 self.client
83 .query_raw(&sql, Vec::<ValueWrap>::new())
84 .await
85 .map_err(|e| {
86 let e = Error::new(e).context(context.clone());
87 log::error!("{:#}", e);
88 e
89 })
90 })),
91 Query::Prepared(..) => Either::Right(
92 try_stream! {
93 let mut transaction = self.begin().await?;
94 {
95 let stream = transaction.fetch(query);
96 let mut stream = pin!(stream);
97 while let Some(value) = stream.next().await.transpose()? {
98 yield value;
99 }
100 }
101 transaction.commit().await?;
102 }
103 .map_err(move |e: Error| {
104 let e = e.context(context.clone());
105 log::error!("{:#}", e);
106 e
107 }),
108 ),
109 }
110 }
111}
112
113impl Connection for PostgresConnection {
114 #[allow(refining_impl_trait)]
115 async fn connect(url: Cow<'static, str>) -> Result<PostgresConnection> {
116 let prefix = format!("{}://", <Self::Driver as Driver>::NAME);
117 if !url.starts_with(&prefix) {
118 let error = Error::msg(format!(
119 "Postgres connection url must start with `{}`",
120 &prefix
121 ));
122 log::error!("{:#}", error);
123 return Err(error);
124 }
125 let (client, connection) = tokio_postgres::connect(&url, NoTls)
126 .await
127 .with_context(|| format!("While trying to connect to `{}`", url))?;
128 spawn(async move {
129 if let Err(e) = connection.await {
130 log::error!("Postgres connection error: {:#}", e);
131 }
132 });
133
134 Ok(Self {
135 client,
136 _transaction: false,
137 })
138 }
139
140 #[allow(refining_impl_trait)]
141 fn begin(&mut self) -> impl Future<Output = Result<PostgresTransaction<'_>>> {
142 PostgresTransaction::new(self)
143 }
144}