use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Clone)]
pub(crate) struct ShutdownHandle {
flag: Arc<AtomicBool>,
}
impl Drop for ShutdownHandle {
fn drop(&mut self) {
self.flag.store(true, Ordering::Relaxed);
}
}
impl std::fmt::Debug for ShutdownHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ShutdownHandle").finish_non_exhaustive()
}
}
#[cfg(any(feature = "runtime-tokio", feature = "runtime-async-std"))]
pub(crate) fn spawn<R: crate::runtime::Runtime, DB: sqlx::Database>(
pool: sqlx::Pool<DB>,
pool_name: String,
interval: std::time::Duration,
) -> ShutdownHandle {
use opentelemetry::KeyValue;
use opentelemetry::metrics::{Gauge, Meter};
use opentelemetry_semantic_conventions::{attribute, metric};
fn record<D: sqlx::Database>(
count: &Gauge<i64>,
pool: &sqlx::Pool<D>,
base_attrs: &[KeyValue],
) {
#[allow(clippy::cast_possible_wrap)] let idle = pool.num_idle() as i64;
let total = i64::from(pool.size());
let used = total - idle;
let mut idle_attrs = Vec::with_capacity(base_attrs.len() + 1);
idle_attrs.extend_from_slice(base_attrs);
idle_attrs.push(KeyValue::new(attribute::DB_CLIENT_CONNECTION_STATE, "idle"));
let mut used_attrs = Vec::with_capacity(base_attrs.len() + 1);
used_attrs.extend_from_slice(base_attrs);
used_attrs.push(KeyValue::new(attribute::DB_CLIENT_CONNECTION_STATE, "used"));
count.record(idle, &idle_attrs);
count.record(used, &used_attrs);
}
let shutdown = Arc::new(AtomicBool::new(false));
let flag = shutdown.clone();
R::spawn(async move {
let meter: Meter = opentelemetry::global::meter("sqlx-otel");
let count = meter
.i64_gauge(metric::DB_CLIENT_CONNECTION_COUNT)
.with_description("The number of connections currently in the state described by the `db.client.connection.state` attribute.")
.build();
let base_attrs = vec![KeyValue::new(
attribute::DB_CLIENT_CONNECTION_POOL_NAME,
pool_name,
)];
loop {
R::sleep(interval).await;
if shutdown.load(Ordering::Relaxed) {
break;
}
record(&count, &pool, &base_attrs);
}
});
ShutdownHandle { flag }
}