#![cfg_attr(not(any(feature = "sqlx", feature = "diesel")), allow(dead_code))]
use crossbeam_channel::{
bounded, unbounded, Receiver as CbReceiver, Select, Sender as CbSender, TryRecvError,
};
use hdrhistogram::Histogram;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock as StdRwLock};
use crate::batch::{register_thread_batch, BatchRegistry, BatchedMeasurement, MeasurementBatch};
use crate::instant::Instant;
use crate::lib_on::hotpath_guard::{
WORKER_BATCH_SIZE, WORKER_FLUSH_INTERVAL_MS, WORKER_SHUTDOWN_DRAIN_LIMIT,
};
use crate::lib_on::START_TIME;
use crate::metrics_server::METRICS_SERVER_PORT;
#[cfg(feature = "diesel")]
pub(crate) mod diesel;
pub(crate) mod normalize;
#[cfg(feature = "sqlx")]
pub(crate) mod tracing_layer;
#[cfg(feature = "diesel")]
pub use diesel::instrument_diesel_sql;
#[cfg(feature = "sqlx")]
pub use tracing_layer::sqlx_tracing_layer;
static SQL_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
fn next_sql_id() -> u32 {
SQL_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug)]
pub(crate) enum SqlEvent {
Executed {
sql: Arc<str>,
duration_nanos: u64,
elapsed_ns: u64,
},
}
#[derive(Debug, Clone)]
pub(crate) struct SqlEntry {
pub(crate) id: u32,
pub(crate) query: String,
pub(crate) count: u64,
pub(crate) total_nanos: u64,
hist: Option<Histogram<u64>>,
}
impl SqlEntry {
const LOW_NS: u64 = 1;
const HIGH_NS: u64 = 1_000_000_000_000; const SIGFIGS: u8 = 3;
fn new(id: u32, query: String) -> Self {
Self {
id,
query,
count: 0,
total_nanos: 0,
hist: Histogram::<u64>::new_with_bounds(Self::LOW_NS, Self::HIGH_NS, Self::SIGFIGS)
.ok(),
}
}
#[inline]
fn record(&mut self, nanos: u64) {
if let Some(ref mut hist) = self.hist {
hist.record(nanos.clamp(Self::LOW_NS, Self::HIGH_NS))
.unwrap();
}
}
pub(crate) fn avg_nanos(&self) -> u64 {
self.total_nanos.checked_div(self.count).unwrap_or(0)
}
pub(crate) fn percentile_nanos(&self, p: f64) -> u64 {
match self.hist {
Some(ref hist) if self.count > 0 => hist.value_at_percentile(p.clamp(0.0, 100.0)),
_ => 0,
}
}
}
pub(crate) struct SqlInternalState {
pub(crate) stats: HashMap<String, SqlEntry>,
}
#[cfg(feature = "hotpath-meta")]
pub(crate) type SqlEventTx = hotpath_meta::wrap::crossbeam::Sender<Vec<SqlEvent>>;
#[cfg(not(feature = "hotpath-meta"))]
pub(crate) type SqlEventTx = CbSender<Vec<SqlEvent>>;
pub(crate) struct SqlState {
pub(crate) event_tx: SqlEventTx,
pub(crate) inner: Arc<StdRwLock<SqlInternalState>>,
pub(crate) shutdown_tx: StdMutex<Option<CbSender<()>>>,
pub(crate) completion_rx: StdMutex<Option<CbReceiver<()>>>,
}
pub(crate) static SQL_STATE: OnceLock<SqlState> = OnceLock::new();
pub(crate) fn get_sorted_sql_entries() -> Vec<SqlEntry> {
let Some(state) = SQL_STATE.get() else {
return Vec::new();
};
let guard = state.inner.read().unwrap();
let mut stats: Vec<SqlEntry> = guard.stats.values().cloned().collect();
stats.sort_by(compare_sql_entries);
stats
}
pub(crate) fn get_sql_json() -> crate::json::JsonSqlList {
let entries = get_sorted_sql_entries();
let elapsed = std::time::Duration::from_nanos(crate::lib_on::current_elapsed_ns());
let reference_total: u64 = entries.iter().map(|e| e.total_nanos).sum();
crate::lib_on::report::collect_sql_json(
&entries,
elapsed,
reference_total,
&crate::lib_on::hotpath_guard::configured_percentiles(),
)
}
static EVENT_REGISTRY: BatchRegistry<SqlEvent> = BatchRegistry::new();
thread_local! {
static EVENT_BATCH: std::sync::Arc<std::sync::Mutex<MeasurementBatch<SqlEvent>>> =
register_thread_batch(&EVENT_REGISTRY);
}
#[inline]
pub(crate) fn send_sql_event(event: SqlEvent) {
let _suspend = crate::lib_on::SuspendAllocTracking::new();
EVENT_BATCH.with(|b| {
if let Ok(mut b) = b.lock() {
b.add(event);
}
});
}
pub(crate) fn flush_sql_batch() {
EVENT_REGISTRY.flush_all();
}
impl BatchedMeasurement for SqlEvent {
type Tx = SqlEventTx;
fn elapsed_since_start_ns(&self) -> u64 {
match self {
SqlEvent::Executed { elapsed_ns, .. } => *elapsed_ns,
}
}
fn fetch_sender() -> Option<Self::Tx> {
Some(SQL_STATE.get()?.event_tx.clone())
}
fn send_batch(tx: &Self::Tx, batch: Vec<Self>) {
let _ = tx.send(batch);
}
fn is_flush_boundary(&self) -> bool {
false
}
}
fn process_sql_event(state: &mut SqlInternalState, event: SqlEvent) {
let SqlEvent::Executed {
sql,
duration_nanos,
elapsed_ns: _,
} = event;
let key = normalize::normalize(&sql);
let entry = state
.stats
.entry(key.clone())
.or_insert_with(|| SqlEntry::new(next_sql_id(), key));
entry.count += 1;
entry.total_nanos += duration_nanos;
entry.record(duration_nanos);
}
fn flush_sql_buffer(buffer: &mut Vec<SqlEvent>, inner: &Arc<StdRwLock<SqlInternalState>>) {
if buffer.is_empty() {
return;
}
if let Ok(mut shared) = inner.write() {
for e in buffer.drain(..) {
process_sql_event(&mut shared, e);
}
}
}
pub(crate) fn init_sql_state() -> &'static SqlState {
SQL_STATE.get_or_init(|| {
START_TIME.get_or_init(Instant::now);
let (event_tx, event_rx) = unbounded::<Vec<SqlEvent>>();
#[cfg(feature = "hotpath-meta")]
let (event_tx, event_rx) =
hotpath_meta::channel!((event_tx, event_rx), wrap = true, label = "hp-sql-events");
let (shutdown_tx, shutdown_rx) = bounded::<()>(1);
let (completion_tx, completion_rx) = bounded::<()>(1);
let inner = Arc::new(StdRwLock::new(SqlInternalState {
stats: HashMap::new(),
}));
let inner_clone = Arc::clone(&inner);
std::thread::Builder::new()
.name("hp-sql".into())
.spawn(move || {
let mut local_buffer: Vec<SqlEvent> = Vec::with_capacity(WORKER_BATCH_SIZE);
let flush_interval = std::time::Duration::from_millis(WORKER_FLUSH_INTERVAL_MS);
let mut select = Select::new();
let _shutdown_idx = select.recv(&shutdown_rx);
#[cfg(feature = "hotpath-meta")]
let _event_idx = select.recv(event_rx.select_handle());
#[cfg(not(feature = "hotpath-meta"))]
let _event_idx = select.recv(&event_rx);
loop {
if select.ready_timeout(flush_interval).is_err() {
flush_sql_buffer(&mut local_buffer, &inner_clone);
continue;
}
if !matches!(shutdown_rx.try_recv(), Err(TryRecvError::Empty)) {
for _ in 0..WORKER_SHUTDOWN_DRAIN_LIMIT {
match event_rx.try_recv() {
Ok(events) => local_buffer.extend(events),
Err(_) => break,
}
}
flush_sql_buffer(&mut local_buffer, &inner_clone);
break;
}
match event_rx.try_recv() {
Ok(events) => {
local_buffer.extend(events);
if local_buffer.len() >= WORKER_BATCH_SIZE {
flush_sql_buffer(&mut local_buffer, &inner_clone);
}
}
Err(TryRecvError::Disconnected) => {
flush_sql_buffer(&mut local_buffer, &inner_clone);
break;
}
Err(TryRecvError::Empty) => {}
}
}
let _ = completion_tx.send(());
})
.expect("Failed to spawn sql-stats-collector thread");
crate::metrics_server::start_metrics_server_once(*METRICS_SERVER_PORT);
SqlState {
event_tx,
inner,
shutdown_tx: StdMutex::new(Some(shutdown_tx)),
completion_rx: StdMutex::new(Some(completion_rx)),
}
})
}
pub(crate) fn compare_sql_entries(a: &SqlEntry, b: &SqlEntry) -> std::cmp::Ordering {
b.total_nanos
.cmp(&a.total_nanos)
.then_with(|| b.count.cmp(&a.count))
.then_with(|| a.id.cmp(&b.id))
}