saola-quaint 0.2.0-alpha.14

An abstraction layer for SQL databases (PostgreSQL, MySQL, SQLite, MSSQL)
use std::future::Future;

use crosstarget_utils::time::ElapsedTimeCounter;
#[cfg(feature = "metrics")]
use prisma_metrics::{counter, histogram};
#[cfg(feature = "telemetry")]
use telemetry::formatting::QueryForTracing;
use tracing::{Instrument, info_span};

#[cfg(not(feature = "metrics"))]
macro_rules! counter {
    ($($arg:tt)*) => {
        $crate::connector::metrics::Dummy::default()
    };
}

#[cfg(not(feature = "metrics"))]
macro_rules! histogram {
    ($($arg:tt)*) => {
        $crate::connector::metrics::Dummy::default()
    };
}


#[cfg(not(feature = "metrics"))]
#[derive(Default)]
pub struct Dummy;

#[cfg(not(feature = "metrics"))]
impl Dummy {
    pub fn increment(&self, _: u64) {}
    pub fn record(&self, _: std::time::Duration) {}
}

#[cfg(not(feature = "telemetry"))]
pub struct QueryForTracing<'a>(pub &'a str);

#[cfg(not(feature = "telemetry"))]
impl std::fmt::Display for QueryForTracing<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        self.0.fmt(f)
    }
}

use crate::ast::{Params, Value};

pub async fn query<'a, F, T, U>(
    tag: &'static str,
    db_system_name: &'static str,
    query: &'a str,
    params: &'a [Value<'_>],
    f: F,
) -> crate::Result<T>
where
    F: FnOnce() -> U + 'a,
    U: Future<Output = crate::Result<T>>,
{
    let span = info_span!(
        "quaint:query",
        "db.system" = db_system_name,
        "db.query.text" = %QueryForTracing(query),
        "otel.kind" = "client",
        "otel.name" = "prisma:engine:db_query",
        user_facing = true,
    );
    do_query(tag, query, params, f).instrument(span).await
}

async fn do_query<'a, F, T, U>(_tag: &'static str, query: &'a str, params: &'a [Value<'_>], f: F) -> crate::Result<T>
where
    F: FnOnce() -> U + 'a,
    U: Future<Output = crate::Result<T>>,
{
    let start = ElapsedTimeCounter::start();
    let res = f().await;

    let result = match res {
        Ok(_) => "success",
        Err(_) => "error",
    };

    #[cfg(feature = "fmt-sql")]
    {
        if std::env::var("FMT_SQL").is_ok() {
            let query_fmt = sqlformat::format(
                query,
                &sqlformat::QueryParams::None,
                sqlformat::FormatOptions::default(),
            );

            trace_query(&query_fmt, params, result, &start);
        } else {
            trace_query(query, params, result, &start);
        };
    }

    #[cfg(not(feature = "fmt-sql"))]
    {
        trace_query(query, params, result, &start);
    }

    histogram!(format!("{_tag}.query.time")).record(start.elapsed_time());
    histogram!("prisma_datasource_queries_duration_histogram_ms").record(start.elapsed_time());
    counter!("prisma_datasource_queries_total").increment(1);

    res
}

#[cfg(feature = "pooled")]
pub(crate) async fn check_out<F, T>(f: F) -> std::result::Result<T, mobc::Error<crate::error::Error>>
where
    F: Future<Output = std::result::Result<T, mobc::Error<crate::error::Error>>>,
{
    let start = ElapsedTimeCounter::start();
    let res = f.await;

    let result = match res {
        Ok(_) => "success",
        Err(_) => "error",
    };

    tracing::trace!(
        message = "Fetched a connection from the pool",
        duration_ms = start.elapsed_time().as_millis() as u64,
        is_query = true,
        result,
    );

    histogram!("pool.check_out").record(start.elapsed_time());

    res
}

fn trace_query<'a>(query: &'a str, params: &'a [Value<'_>], result: &str, start: &ElapsedTimeCounter) {
    tracing::debug!(
        query = %query,
        params = %Params(params),
        result,
        item_type = "query",
        is_query = true,
        duration_ms = start.elapsed_time().as_millis() as u64,
    );
}