use crate::collectors::Collector;
use anyhow::Result;
use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use prometheus::Registry;
use sqlx::PgPool;
use std::sync::Arc;
use tracing::{debug, info_span, instrument, warn};
use tracing_futures::Instrument as _;
pub mod pg_statements;
use pg_statements::PgStatementsCollector;
#[derive(Clone)]
pub struct StatementsCollector {
subs: Vec<Arc<dyn Collector + Send + Sync>>,
}
impl StatementsCollector {
#[must_use]
pub(crate) fn new() -> Self {
Self::with_top_n(25)
}
#[must_use]
pub fn with_top_n(top_n: usize) -> Self {
Self {
subs: vec![Arc::new(PgStatementsCollector::with_top_n(top_n))],
}
}
}
impl Collector for StatementsCollector {
fn name(&self) -> &'static str {
"statements"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "statements")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
for sub in &self.subs {
let span = info_span!("collector.register_metrics", sub_collector = %sub.name());
let res = sub.register_metrics(registry);
match res {
Ok(()) => {
debug!(collector = sub.name(), "registered metrics");
}
Err(ref e) => {
warn!(collector = sub.name(), error = %e, "failed to register metrics");
}
}
res?;
drop(span);
}
Ok(())
}
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let mut tasks = FuturesUnordered::new();
for sub in &self.subs {
let span = info_span!(
"collector.collect",
sub_collector = %sub.name(),
otel.kind = "internal"
);
tasks.push(sub.collect(pool).instrument(span));
}
while let Some(res) = tasks.next().await {
res?;
}
Ok(())
})
}
fn enabled_by_default(&self) -> bool {
false }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_statements_collector_name() {
let collector = StatementsCollector::with_top_n(25);
assert_eq!(collector.name(), "statements");
}
#[test]
fn test_statements_collector_not_enabled_by_default() {
let collector = StatementsCollector::with_top_n(25);
assert!(!collector.enabled_by_default());
}
}