use crate::error::{Error, NoRowsReturned, QueryOneError, Result};
use crate::{Config, Row, RowStream, Statement, ToSql, ToStatement};
use af_core::prelude::*;
use af_core::task::{self, Task};
#[derive(Clone)]
pub struct Client {
inner: Arc<tokio_postgres::Client>,
}
pub async fn connect(config: &Config) -> Result<(Client, Task<Result>)> {
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
let tls_connector = TlsConnector::builder().danger_accept_invalid_certs(true).build().unwrap();
let (client, connection) = config.connect(MakeTlsConnector::new(tls_connector)).await?;
let task = task::start(connection.map_err(Error::from));
Ok((Client::wrap(client), task))
}
fn param_iter<'a>(
p: &'a [&'a (dyn ToSql + Sync)],
) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
p.iter().map(|p| *p as _)
}
impl Client {
pub(crate) fn wrap(client: tokio_postgres::Client) -> Self {
Self { inner: Arc::new(client) }
}
pub async fn execute(
&self,
statement: &(impl ToStatement + ?Sized),
params: &[&(dyn ToSql + Sync)],
) -> Result<u64> {
Ok(self.inner.execute_raw(statement, param_iter(params)).await?)
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub async fn batch_execute(&self, statements: impl AsRef<str>) -> Result {
Ok(self.inner.batch_execute(statements.as_ref()).await?)
}
pub async fn begin(&self) -> Result {
Ok(self.inner.batch_execute("BEGIN;").await?)
}
pub async fn commit(&self) -> Result {
Ok(self.inner.batch_execute("COMMIT;").await?)
}
pub async fn prepare(&self, query: impl AsRef<str>) -> Result<Statement> {
Ok(self.inner.prepare(query.as_ref()).await?)
}
pub async fn query(
&self,
query: &(impl ToStatement + ?Sized),
params: &[&(dyn ToSql + Sync)],
) -> Result<RowStream> {
Ok(self.inner.query_raw(query, param_iter(params)).await?)
}
pub async fn query_opt(
&self,
query: &(impl ToStatement + ?Sized),
params: &[&(dyn ToSql + Sync)],
) -> Result<Option<Row>> {
let rows = self.query(query, params).await?;
pin!(rows);
Ok(rows.next().await.transpose()?)
}
pub async fn query_one(
&self,
query: &(impl ToStatement + ?Sized),
params: &[&(dyn ToSql + Sync)],
) -> Result<Row, QueryOneError> {
self.query_opt(query, params).await?.ok_or(NoRowsReturned)
}
pub async fn rollback(&self) -> Result {
Ok(self.inner.batch_execute("ROLLBACK;").await?)
}
}