use super::truncate_sql_bytes;
use super::types::{HookAction, QueryContext, QueryHook, QueryMonitor, QueryResult, QueryType};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy, Default)]
pub struct NoopMonitor;
impl QueryMonitor for NoopMonitor {
fn on_query_complete(&self, _ctx: &QueryContext, _duration: Duration, _result: &QueryResult) {}
}
#[derive(Debug, Clone)]
pub struct LoggingMonitor {
pub min_duration: Option<Duration>,
pub max_sql_length: Option<usize>,
pub prefix: String,
}
impl Default for LoggingMonitor {
fn default() -> Self {
Self {
min_duration: None,
max_sql_length: Some(200),
prefix: "[pgorm]".to_string(),
}
}
}
impl LoggingMonitor {
pub fn new() -> Self {
Self::default()
}
pub fn min_duration(mut self, duration: Duration) -> Self {
self.min_duration = Some(duration);
self
}
pub fn max_sql_length(mut self, len: usize) -> Self {
self.max_sql_length = Some(len);
self
}
pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = prefix.into();
self
}
pub(crate) fn truncate_sql(&self, sql: &str) -> String {
match self.max_sql_length {
Some(max) if sql.len() > max => format!("{}...", truncate_sql_bytes(sql, max)),
_ => sql.to_string(),
}
}
}
impl LoggingMonitor {
fn format_sql(&self, ctx: &QueryContext) -> String {
let canonical = self.truncate_sql(&ctx.canonical_sql);
if ctx.exec_sql != ctx.canonical_sql {
format!(
"canonical: {} | exec: {}",
canonical,
self.truncate_sql(&ctx.exec_sql)
)
} else {
canonical
}
}
}
impl QueryMonitor for LoggingMonitor {
fn on_query_complete(&self, ctx: &QueryContext, duration: Duration, result: &QueryResult) {
if let Some(min) = self.min_duration {
if duration < min {
return;
}
}
let sql = self.format_sql(ctx);
let tag = ctx.tag.as_deref().unwrap_or("-");
crate::error::pgorm_warn(&format!(
"{} [{:?}] [{}] {:?} | {} | {}",
self.prefix, ctx.query_type, tag, duration, result, sql
));
}
fn on_slow_query(&self, ctx: &QueryContext, duration: Duration) {
let sql = self.format_sql(ctx);
crate::error::pgorm_warn(&format!(
"{} SLOW QUERY [{:?}]: {:?} | {}",
self.prefix, ctx.query_type, duration, sql
));
}
}
#[derive(Debug)]
pub struct StatsMonitor {
total_queries: std::sync::atomic::AtomicU64,
failed_queries: std::sync::atomic::AtomicU64,
total_duration_nanos: std::sync::atomic::AtomicU64,
select_count: std::sync::atomic::AtomicU64,
insert_count: std::sync::atomic::AtomicU64,
update_count: std::sync::atomic::AtomicU64,
delete_count: std::sync::atomic::AtomicU64,
slowest: std::sync::Mutex<(u64, Option<String>)>,
stmt_cache_hits: std::sync::atomic::AtomicU64,
stmt_cache_misses: std::sync::atomic::AtomicU64,
stmt_prepare_count: std::sync::atomic::AtomicU64,
stmt_prepare_duration_nanos: std::sync::atomic::AtomicU64,
}
#[derive(Debug, Clone, Default)]
pub struct QueryStats {
pub total_queries: u64,
pub failed_queries: u64,
pub total_duration: Duration,
pub select_count: u64,
pub insert_count: u64,
pub update_count: u64,
pub delete_count: u64,
pub max_duration: Duration,
pub slowest_query: Option<String>,
pub stmt_cache_hits: u64,
pub stmt_cache_misses: u64,
pub stmt_prepare_count: u64,
pub stmt_prepare_duration: Duration,
}
impl StatsMonitor {
pub fn new() -> Self {
Self::default()
}
pub fn stats(&self) -> QueryStats {
use std::sync::atomic::Ordering;
let slowest = self.slowest.lock().unwrap_or_else(|e| e.into_inner());
let (max_nanos, slowest_query) = (slowest.0, slowest.1.clone());
drop(slowest);
QueryStats {
total_queries: self.total_queries.load(Ordering::Relaxed),
failed_queries: self.failed_queries.load(Ordering::Relaxed),
total_duration: Duration::from_nanos(self.total_duration_nanos.load(Ordering::Relaxed)),
select_count: self.select_count.load(Ordering::Relaxed),
insert_count: self.insert_count.load(Ordering::Relaxed),
update_count: self.update_count.load(Ordering::Relaxed),
delete_count: self.delete_count.load(Ordering::Relaxed),
max_duration: Duration::from_nanos(max_nanos),
slowest_query,
stmt_cache_hits: self.stmt_cache_hits.load(Ordering::Relaxed),
stmt_cache_misses: self.stmt_cache_misses.load(Ordering::Relaxed),
stmt_prepare_count: self.stmt_prepare_count.load(Ordering::Relaxed),
stmt_prepare_duration: Duration::from_nanos(
self.stmt_prepare_duration_nanos.load(Ordering::Relaxed),
),
}
}
pub fn reset(&self) {
use std::sync::atomic::Ordering;
self.total_queries.store(0, Ordering::Relaxed);
self.failed_queries.store(0, Ordering::Relaxed);
self.total_duration_nanos.store(0, Ordering::Relaxed);
self.select_count.store(0, Ordering::Relaxed);
self.insert_count.store(0, Ordering::Relaxed);
self.update_count.store(0, Ordering::Relaxed);
self.delete_count.store(0, Ordering::Relaxed);
*self.slowest.lock().unwrap_or_else(|e| e.into_inner()) = (0, None);
self.stmt_cache_hits.store(0, Ordering::Relaxed);
self.stmt_cache_misses.store(0, Ordering::Relaxed);
self.stmt_prepare_count.store(0, Ordering::Relaxed);
self.stmt_prepare_duration_nanos.store(0, Ordering::Relaxed);
}
pub fn on_stmt_cache_hit(&self) {
use std::sync::atomic::Ordering;
self.stmt_cache_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn on_stmt_cache_miss(&self) {
use std::sync::atomic::Ordering;
self.stmt_cache_misses.fetch_add(1, Ordering::Relaxed);
}
pub fn on_stmt_prepare(&self, duration: Duration) {
use std::sync::atomic::Ordering;
let nanos = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
self.stmt_prepare_count.fetch_add(1, Ordering::Relaxed);
self.stmt_prepare_duration_nanos
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |prev| {
Some(prev.saturating_add(nanos))
})
.ok();
}
}
impl Default for StatsMonitor {
fn default() -> Self {
Self {
total_queries: std::sync::atomic::AtomicU64::new(0),
failed_queries: std::sync::atomic::AtomicU64::new(0),
total_duration_nanos: std::sync::atomic::AtomicU64::new(0),
select_count: std::sync::atomic::AtomicU64::new(0),
insert_count: std::sync::atomic::AtomicU64::new(0),
update_count: std::sync::atomic::AtomicU64::new(0),
delete_count: std::sync::atomic::AtomicU64::new(0),
slowest: std::sync::Mutex::new((0, None)),
stmt_cache_hits: std::sync::atomic::AtomicU64::new(0),
stmt_cache_misses: std::sync::atomic::AtomicU64::new(0),
stmt_prepare_count: std::sync::atomic::AtomicU64::new(0),
stmt_prepare_duration_nanos: std::sync::atomic::AtomicU64::new(0),
}
}
}
impl QueryMonitor for StatsMonitor {
fn on_query_complete(&self, ctx: &QueryContext, duration: Duration, result: &QueryResult) {
use std::sync::atomic::Ordering;
let duration_nanos = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
self.total_queries.fetch_add(1, Ordering::Relaxed);
self.total_duration_nanos
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |prev| {
Some(prev.saturating_add(duration_nanos))
})
.ok();
match ctx.query_type {
QueryType::Select => {
self.select_count.fetch_add(1, Ordering::Relaxed);
}
QueryType::Insert => {
self.insert_count.fetch_add(1, Ordering::Relaxed);
}
QueryType::Update => {
self.update_count.fetch_add(1, Ordering::Relaxed);
}
QueryType::Delete => {
self.delete_count.fetch_add(1, Ordering::Relaxed);
}
QueryType::Other => {}
}
if matches!(result, QueryResult::Error(_)) {
self.failed_queries.fetch_add(1, Ordering::Relaxed);
}
let mut slowest = self.slowest.lock().unwrap_or_else(|e| e.into_inner());
if duration_nanos > slowest.0 {
slowest.0 = duration_nanos;
slowest.1 = Some(ctx.canonical_sql.clone());
}
}
}
pub struct CompositeMonitor {
monitors: Vec<Arc<dyn QueryMonitor>>,
}
impl CompositeMonitor {
pub fn new() -> Self {
Self {
monitors: Vec::new(),
}
}
#[allow(clippy::should_implement_trait)]
pub fn add<M: QueryMonitor + 'static>(mut self, monitor: M) -> Self {
self.monitors.push(Arc::new(monitor));
self
}
pub fn add_arc(mut self, monitor: Arc<dyn QueryMonitor>) -> Self {
self.monitors.push(monitor);
self
}
}
impl Default for CompositeMonitor {
fn default() -> Self {
Self::new()
}
}
impl QueryMonitor for CompositeMonitor {
fn on_query_start(&self, ctx: &QueryContext) {
for monitor in &self.monitors {
monitor.on_query_start(ctx);
}
}
fn on_query_complete(&self, ctx: &QueryContext, duration: Duration, result: &QueryResult) {
for monitor in &self.monitors {
monitor.on_query_complete(ctx, duration, result);
}
}
fn on_slow_query(&self, ctx: &QueryContext, duration: Duration) {
for monitor in &self.monitors {
monitor.on_slow_query(ctx, duration);
}
}
}
pub struct CompositeHook {
hooks: Vec<Arc<dyn QueryHook>>,
}
impl CompositeHook {
pub fn new() -> Self {
Self { hooks: Vec::new() }
}
#[allow(clippy::should_implement_trait)]
pub fn add<H: QueryHook + 'static>(mut self, hook: H) -> Self {
self.hooks.push(Arc::new(hook));
self
}
pub fn add_arc(mut self, hook: Arc<dyn QueryHook>) -> Self {
self.hooks.push(hook);
self
}
}
impl Default for CompositeHook {
fn default() -> Self {
Self::new()
}
}
impl QueryHook for CompositeHook {
fn before_query(&self, ctx: &QueryContext) -> HookAction {
let mut owned: Option<QueryContext> = None;
for hook in &self.hooks {
let current = owned.as_ref().unwrap_or(ctx);
match hook.before_query(current) {
HookAction::Continue => {}
HookAction::ModifySql {
exec_sql,
canonical_sql,
} => {
let c = owned.get_or_insert_with(|| ctx.clone());
c.exec_sql = exec_sql;
if let Some(canonical_sql) = canonical_sql {
c.canonical_sql = canonical_sql;
}
c.query_type = QueryType::from_sql(&c.canonical_sql);
}
action @ HookAction::Abort(_) => return action,
}
}
match owned {
Some(c) => HookAction::ModifySql {
canonical_sql: (c.canonical_sql != ctx.canonical_sql).then_some(c.canonical_sql),
exec_sql: c.exec_sql,
},
None => HookAction::Continue,
}
}
fn after_query(&self, ctx: &QueryContext, duration: Duration, result: &QueryResult) {
for hook in &self.hooks {
hook.after_query(ctx, duration, result);
}
}
}