use tokio_postgres::types::ToSql;
use tokio_postgres::{Error, Row};
use tracing::Instrument;
#[derive(Clone)]
pub struct TracedPool {
inner: deadpool_postgres::Pool,
}
impl TracedPool {
pub fn new(pool: deadpool_postgres::Pool) -> Self {
Self { inner: pool }
}
pub async fn get(&self) -> Result<TracedObject, deadpool_postgres::PoolError> {
let conn = self.inner.get().await?;
Ok(TracedObject { inner: conn })
}
pub fn inner(&self) -> &deadpool_postgres::Pool {
&self.inner
}
}
pub struct TracedObject {
inner: deadpool_postgres::Object,
}
impl TracedObject {
pub async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
let span = tracing::debug_span!(
"db.execute",
sql = %sql,
params = params.len(),
affected = tracing::field::Empty,
);
use std::ops::Deref;
let client: &tokio_postgres::Client = self.inner.deref();
let affected = client.execute(sql, params).instrument(span.clone()).await?;
span.record("affected", affected);
Ok(affected)
}
pub async fn query(
&self,
sql: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, Error> {
let span = tracing::debug_span!(
"db.query",
sql = %sql,
params = params.len(),
rows = tracing::field::Empty,
);
use std::ops::Deref;
let client: &tokio_postgres::Client = self.inner.deref();
let rows = client.query(sql, params).instrument(span.clone()).await?;
span.record("rows", rows.len());
Ok(rows)
}
pub async fn query_opt(
&self,
sql: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Option<Row>, Error> {
let span = tracing::debug_span!(
"db.query",
sql = %sql,
params = params.len(),
rows = tracing::field::Empty,
);
use std::ops::Deref;
let client: &tokio_postgres::Client = self.inner.deref();
let row = client
.query_opt(sql, params)
.instrument(span.clone())
.await?;
span.record("rows", if row.is_some() { 1u64 } else { 0u64 });
Ok(row)
}
pub async fn query_one(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error> {
let span = tracing::debug_span!(
"db.query",
sql = %sql,
params = params.len(),
rows = 1u64,
);
use std::ops::Deref;
let client: &tokio_postgres::Client = self.inner.deref();
client.query_one(sql, params).instrument(span).await
}
pub fn inner(&self) -> &deadpool_postgres::Object {
&self.inner
}
}
pub struct TracedConn<'a, C: Connection> {
conn: &'a C,
}
impl<'a, C: Connection> TracedConn<'a, C> {
pub fn new(conn: &'a C) -> Self {
Self { conn }
}
pub async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
let span = tracing::debug_span!(
"db.execute",
sql = %sql,
params = params.len(),
affected = tracing::field::Empty,
);
let affected = self
.conn
.execute(sql, params)
.instrument(span.clone())
.await?;
span.record("affected", affected);
Ok(affected)
}
pub async fn query(
&self,
sql: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, Error> {
let span = tracing::debug_span!(
"db.query",
sql = %sql,
params = params.len(),
rows = tracing::field::Empty,
);
let rows = self
.conn
.query(sql, params)
.instrument(span.clone())
.await?;
span.record("rows", rows.len());
Ok(rows)
}
pub async fn query_opt(
&self,
sql: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Option<Row>, Error> {
let span = tracing::debug_span!(
"db.query",
sql = %sql,
params = params.len(),
rows = tracing::field::Empty,
);
let row = self
.conn
.query_opt(sql, params)
.instrument(span.clone())
.await?;
span.record("rows", if row.is_some() { 1u64 } else { 0u64 });
Ok(row)
}
pub async fn query_one(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error> {
let span = tracing::debug_span!(
"db.query",
sql = %sql,
params = params.len(),
rows = 1u64,
);
self.conn.query_one(sql, params).instrument(span).await
}
}
pub trait ConnectionExt: Connection + Sized {
fn traced(&self) -> TracedConn<'_, Self> {
TracedConn::new(self)
}
}
impl<C: Connection> ConnectionExt for C {}
pub trait Connection: Send + Sync {
fn execute<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, Error>> + Send + 'a>>;
fn query<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Row>, Error>> + Send + 'a>>;
fn query_opt<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<Row>, Error>> + Send + 'a>>;
fn query_one<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Row, Error>> + Send + 'a>>;
}
impl Connection for tokio_postgres::Client {
fn execute<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, Error>> + Send + 'a>> {
Box::pin(tokio_postgres::Client::execute(self, sql, params))
}
fn query<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Row>, Error>> + Send + 'a>>
{
Box::pin(tokio_postgres::Client::query(self, sql, params))
}
fn query_opt<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<Row>, Error>> + Send + 'a>>
{
Box::pin(tokio_postgres::Client::query_opt(self, sql, params))
}
fn query_one<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Row, Error>> + Send + 'a>> {
Box::pin(tokio_postgres::Client::query_one(self, sql, params))
}
}
impl Connection for deadpool_postgres::Object {
fn execute<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, Error>> + Send + 'a>> {
use std::ops::Deref;
let client: &tokio_postgres::Client = self.deref();
Box::pin(client.execute(sql, params))
}
fn query<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Row>, Error>> + Send + 'a>>
{
use std::ops::Deref;
let client: &tokio_postgres::Client = self.deref();
Box::pin(client.query(sql, params))
}
fn query_opt<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<Row>, Error>> + Send + 'a>>
{
use std::ops::Deref;
let client: &tokio_postgres::Client = self.deref();
Box::pin(client.query_opt(sql, params))
}
fn query_one<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a (dyn ToSql + Sync)],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Row, Error>> + Send + 'a>> {
use std::ops::Deref;
let client: &tokio_postgres::Client = self.deref();
Box::pin(client.query_one(sql, params))
}
}