use std::sync::Arc;
use std::time::Duration;
use opentelemetry_semantic_conventions::metric as semconv_metric;
use crate::annotations::{Annotated, QueryAnnotations};
use crate::attributes::{ConnectionAttributes, QueryTextMode};
use crate::connection::PoolConnection;
use crate::database::Database;
use crate::metrics::Metrics;
use crate::transaction::Transaction;
#[derive(Debug, Clone)]
pub(crate) struct SharedState {
pub attrs: Arc<ConnectionAttributes>,
pub metrics: Arc<Metrics>,
}
#[derive(Debug)]
pub struct PoolBuilder<DB: sqlx::Database> {
pool: sqlx::Pool<DB>,
host: Option<String>,
port: Option<u16>,
namespace: Option<String>,
network_peer_address: Option<String>,
network_peer_port: Option<u16>,
query_text_mode: QueryTextMode,
pool_name: Option<String>,
pool_metrics_interval: Duration,
}
impl<DB: Database> From<sqlx::Pool<DB>> for PoolBuilder<DB> {
fn from(pool: sqlx::Pool<DB>) -> Self {
let (host, port, namespace) = DB::connection_attributes(&pool);
Self {
pool,
host,
port,
namespace,
network_peer_address: None,
network_peer_port: None,
query_text_mode: QueryTextMode::default(),
pool_name: None,
pool_metrics_interval: Duration::from_secs(10),
}
}
}
impl<DB: Database> PoolBuilder<DB> {
#[must_use]
pub fn with_database(mut self, database: impl Into<String>) -> Self {
self.namespace = Some(database.into());
self
}
#[must_use]
pub fn with_host(mut self, host: impl Into<String>) -> Self {
self.host = Some(host.into());
self
}
#[must_use]
pub fn with_port(mut self, port: u16) -> Self {
self.port = Some(port);
self
}
#[must_use]
pub fn with_network_peer_address(mut self, address: impl Into<String>) -> Self {
self.network_peer_address = Some(address.into());
self
}
#[must_use]
pub fn with_network_peer_port(mut self, port: u16) -> Self {
self.network_peer_port = Some(port);
self
}
#[must_use]
pub fn with_query_text_mode(mut self, mode: QueryTextMode) -> Self {
self.query_text_mode = mode;
self
}
#[must_use]
pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
self.pool_name = Some(name.into());
self
}
#[must_use]
pub fn with_pool_metrics_interval(mut self, interval: Duration) -> Self {
self.pool_metrics_interval = interval;
self
}
#[must_use]
pub fn build(self) -> Pool<DB> {
let metrics_shutdown = self.spawn_pool_metrics_task();
let attrs = Arc::new(ConnectionAttributes {
system: DB::SYSTEM,
host: self.host,
port: self.port,
namespace: self.namespace,
network_peer_address: self.network_peer_address,
network_peer_port: self.network_peer_port,
query_text_mode: self.query_text_mode,
});
let metrics = Arc::new(Metrics::new());
let meter = opentelemetry::global::meter("sqlx-otel");
let max_conns = i64::from(self.pool.options().get_max_connections());
let min_conns = i64::from(self.pool.options().get_min_connections());
let base_attrs = attrs.base_key_values();
meter
.i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_MAX)
.with_description("The maximum number of open connections allowed.")
.build()
.record(max_conns, &base_attrs);
meter
.i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_IDLE_MAX)
.with_description("The maximum number of idle open connections allowed.")
.build()
.record(max_conns, &base_attrs);
meter
.i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_IDLE_MIN)
.with_description("The minimum number of idle open connections allowed.")
.build()
.record(min_conns, &base_attrs);
Pool {
inner: self.pool,
state: SharedState { attrs, metrics },
metrics_shutdown,
wait_time: Arc::new(
meter
.f64_histogram(semconv_metric::DB_CLIENT_CONNECTION_WAIT_TIME)
.with_unit("s")
.with_description(
"The time it took to obtain an open connection from the pool.",
)
.build(),
),
use_time: Arc::new(
meter
.f64_histogram(semconv_metric::DB_CLIENT_CONNECTION_USE_TIME)
.with_unit("s")
.with_description(
"The time between borrowing a connection and returning it to the pool.",
)
.build(),
),
timeouts: Arc::new(
meter
.u64_counter(semconv_metric::DB_CLIENT_CONNECTION_TIMEOUTS)
.with_description(
"The number of connection pool acquire attempts that timed out.",
)
.build(),
),
pending_requests: Arc::new(
meter
.i64_up_down_counter(semconv_metric::DB_CLIENT_CONNECTION_PENDING_REQUESTS)
.with_description("The number of pending requests for an open connection.")
.build(),
),
}
}
fn spawn_pool_metrics_task(&self) -> Option<crate::pool_metrics::ShutdownHandle> {
let name = self.pool_name.as_ref()?;
#[cfg(feature = "runtime-tokio")]
{
Some(
crate::pool_metrics::spawn::<crate::runtime::TokioRuntime, DB>(
self.pool.clone(),
name.clone(),
self.pool_metrics_interval,
),
)
}
#[cfg(all(feature = "runtime-async-std", not(feature = "runtime-tokio")))]
{
Some(crate::pool_metrics::spawn::<
crate::runtime::AsyncStdRuntime,
DB,
>(
self.pool.clone(),
name.clone(),
self.pool_metrics_interval,
))
}
#[cfg(not(any(feature = "runtime-tokio", feature = "runtime-async-std")))]
{
let _ = name;
None
}
}
}
#[derive(Debug)]
pub struct Pool<DB: sqlx::Database> {
pub(crate) inner: sqlx::Pool<DB>,
pub(crate) state: SharedState,
metrics_shutdown: Option<crate::pool_metrics::ShutdownHandle>,
wait_time: Arc<opentelemetry::metrics::Histogram<f64>>,
pub(crate) use_time: Arc<opentelemetry::metrics::Histogram<f64>>,
timeouts: Arc<opentelemetry::metrics::Counter<u64>>,
pending_requests: Arc<opentelemetry::metrics::UpDownCounter<i64>>,
}
impl<DB: sqlx::Database> Clone for Pool<DB> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
state: self.state.clone(),
metrics_shutdown: self.metrics_shutdown.clone(),
wait_time: self.wait_time.clone(),
use_time: self.use_time.clone(),
timeouts: self.timeouts.clone(),
pending_requests: self.pending_requests.clone(),
}
}
}
impl<DB: Database> Pool<DB> {
pub async fn acquire(&self) -> Result<PoolConnection<DB>, sqlx::Error> {
let attrs = self.state.attrs.base_key_values();
self.pending_requests.add(1, &attrs);
let start = std::time::Instant::now();
let result = self.inner.acquire().await;
self.pending_requests.add(-1, &attrs);
self.wait_time.record(start.elapsed().as_secs_f64(), &attrs);
if let Err(sqlx::Error::PoolTimedOut) = &result {
self.timeouts.add(1, &attrs);
}
result.map(|inner| PoolConnection {
inner,
state: self.state.clone(),
use_time: self.use_time.clone(),
acquired_at: std::time::Instant::now(),
base_attrs: attrs,
})
}
pub async fn begin(&self) -> Result<Transaction<'_, DB>, sqlx::Error> {
self.inner.begin().await.map(|inner| Transaction {
inner,
state: self.state.clone(),
})
}
pub async fn close(&self) {
self.inner.close().await;
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
#[must_use]
pub fn with_annotations(&self, annotations: QueryAnnotations) -> Annotated<'_, Self> {
Annotated {
inner: self,
annotations,
state: self.state.clone(),
}
}
#[must_use]
pub fn with_operation(
&self,
operation: impl Into<String>,
collection: impl Into<String>,
) -> Annotated<'_, Self> {
self.with_annotations(
QueryAnnotations::new()
.operation(operation)
.collection(collection),
)
}
}