athena_rs 3.6.1

Hyper performant polyglot Database driver
Documentation
//! Prometheus-emission layer for [`ConnectionPoolSnapshot`].
//!
//! Writing pool metrics is something you *do* to a snapshot. This file hosts
//! that verb plus a small [`PoolMetricsAggregate`] that accumulates cross-pool
//! totals so the exporter never has to re-inspect `PgPool` handles directly.
//!
//! # Contract
//!
//! - Every line written is valid Prometheus text-format (the emitter only
//!   produces gauges, numeric values, and quoted label pairs).
//! - HELP/TYPE preamble is emitted **exactly once per scrape** by
//!   [`ConnectionPoolSnapshot::write_prometheus_preamble`] — per the
//!   Prometheus spec, repeating HELP/TYPE for the same metric name is a
//!   protocol violation.
//! - Label cardinality is stable across scrapes: clients whose pool is
//!   absent still get a zero-block via [`ConnectionPoolSnapshot::write_prometheus_absent`]
//!   so gauges never disappear mid-series.
//!
//! # Metric surface
//!
//! All gauges, all prefixed `athena_pg_pool_*`:
//!
//! | Metric | Meaning |
//! | --- | --- |
//! | `athena_pg_pool_up` | `1` when a snapshot was captured, `0` when absent. |
//! | `athena_pg_pool_closed` | Mirrors `PgPool::is_closed()` at capture time. |
//! | `athena_pg_pool_connections{state="size\|idle\|active"}` | Per-state counts. |
//! | `athena_pg_pool_max_connections` | Configured upper bound (from `PgPoolOptions`). |
//! | `athena_pg_pool_connection_utilization_ratio` | `active / size`. |
//! | `athena_pg_pool_idle_ratio` | `idle / size`. |
//! | `athena_pg_pool_saturation_ratio` | `size / max_connections` (pressure vs ceiling). |
//! | `athena_pg_pool_headroom` | `max_connections - size` (free capacity). |
//! | `athena_pg_pool_snapshot_age_seconds` | `now - recorded_at` (staleness detector). |
//!
//! Totals (no per-client labels) are emitted by
//! [`PoolMetricsAggregate::write_totals`]:
//!
//! | Metric | Meaning |
//! | --- | --- |
//! | `athena_pg_pool_connections_total{state=...}` | Sum of per-state counts across all pools. |
//! | `athena_pg_pool_max_connections_total` | Sum of configured `max_connections` across all pools. |
//! | `athena_pg_pool_overall_utilization_ratio` | Fleet-wide `active / size`. |
//! | `athena_pg_pool_headroom_total` | Fleet-wide free capacity (`sum(max) - sum(size)`). |
//!
//! # Example
//!
//! ```no_run
//! # use athena_rs::features::connection_pooler::ConnectionPoolSnapshot;
//! # use athena_rs::features::connection_pooler::prometheus::PoolMetricsAggregate;
//! # fn example(snapshots: &[ConnectionPoolSnapshot]) -> String {
//! let mut body = String::new();
//! let mut agg = PoolMetricsAggregate::default();
//!
//! ConnectionPoolSnapshot::write_prometheus_preamble(&mut body);
//! for snap in snapshots {
//!     let labels = format!("client=\"{}\"", snap.client_name);
//!     snap.write_prometheus(&mut body, &labels);
//!     agg.observe(snap);
//! }
//! agg.write_totals(&mut body);
//! body
//! # }
//! ```

use chrono::Utc;
use std::fmt::Write;

use crate::features::connection_pooler::ConnectionPoolSnapshot;

/// Accumulator for cross-pool totals emitted at the end of the per-client loop.
///
/// Constructed empty via [`Default`] and fed each per-client snapshot through
/// [`PoolMetricsAggregate::observe`]. The final call to
/// [`PoolMetricsAggregate::write_totals`] flushes the aggregate gauges.
#[derive(Debug, Default, Clone)]
pub struct PoolMetricsAggregate {
    pub total_size: u64,
    pub total_idle: u64,
    pub total_active: u64,
    pub total_max: u64,
}

impl PoolMetricsAggregate {
    /// Fold one snapshot's counters into the running totals.
    pub fn observe(&mut self, snapshot: &ConnectionPoolSnapshot) {
        self.total_size = self
            .total_size
            .saturating_add(u64::from(snapshot.pool_size));
        self.total_idle = self
            .total_idle
            .saturating_add(u64::from(snapshot.idle_connections));
        self.total_active = self
            .total_active
            .saturating_add(u64::from(snapshot.active_connections));
        self.total_max = self
            .total_max
            .saturating_add(u64::from(snapshot.max_connections));
    }

    /// Emit the aggregate gauges (`athena_pg_pool_connections_total`,
    /// `athena_pg_pool_overall_utilization_ratio`,
    /// `athena_pg_pool_max_connections_total`, `athena_pg_pool_headroom_total`).
    pub fn write_totals(&self, body: &mut String) {
        write_help_and_type(
            body,
            "athena_pg_pool_connections_total",
            "Aggregated Postgres pool connection counts across all registered client pools.",
            "gauge",
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections_total",
            "state=\"size\"",
            self.total_size,
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections_total",
            "state=\"idle\"",
            self.total_idle,
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections_total",
            "state=\"active\"",
            self.total_active,
        );

        write_help_and_type(
            body,
            "athena_pg_pool_max_connections_total",
            "Sum of configured max_connections across all registered client pools.",
            "gauge",
        );
        write_value(body, "athena_pg_pool_max_connections_total", self.total_max);

        write_help_and_type(
            body,
            "athena_pg_pool_overall_utilization_ratio",
            "Overall utilization ratio (active/size) across all Postgres pools.",
            "gauge",
        );
        let utilization: f64 = if self.total_size == 0 {
            0.0
        } else {
            self.total_active as f64 / self.total_size as f64
        };
        write_value(
            body,
            "athena_pg_pool_overall_utilization_ratio",
            format!("{utilization:.6}"),
        );

        write_help_and_type(
            body,
            "athena_pg_pool_headroom_total",
            "Remaining provisioned capacity across all pools (sum(max) - sum(size)).",
            "gauge",
        );
        let headroom: u64 = self.total_max.saturating_sub(self.total_size);
        write_value(body, "athena_pg_pool_headroom_total", headroom);
    }
}

impl ConnectionPoolSnapshot {
    /// Append this snapshot's per-client gauges to `body` under `labels`.
    ///
    /// `labels` must already contain the comma-separated key=value pairs
    /// identifying the client (e.g. `client="logging",source="config"`). The
    /// caller is responsible for `write_help_and_type` preamble so the
    /// `# HELP` / `# TYPE` lines are emitted exactly once per scrape.
    pub fn write_prometheus(&self, body: &mut String, labels: &str) {
        write_with_labels(body, "athena_pg_pool_up", labels, 1);
        write_with_labels(
            body,
            "athena_pg_pool_closed",
            labels,
            if self.is_closed { 1 } else { 0 },
        );

        write_with_labels(
            body,
            "athena_pg_pool_connections",
            &format!("{labels},state=\"size\""),
            self.pool_size,
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections",
            &format!("{labels},state=\"idle\""),
            self.idle_connections,
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections",
            &format!("{labels},state=\"active\""),
            self.active_connections,
        );

        write_with_labels(
            body,
            "athena_pg_pool_max_connections",
            labels,
            self.max_connections,
        );

        let utilization: f64 = if self.pool_size == 0 {
            0.0
        } else {
            self.active_connections as f64 / self.pool_size as f64
        };
        let idle_ratio: f64 = if self.pool_size == 0 {
            0.0
        } else {
            self.idle_connections as f64 / self.pool_size as f64
        };
        let saturation: f64 = if self.max_connections == 0 {
            0.0
        } else {
            self.pool_size as f64 / self.max_connections as f64
        };
        let headroom: u32 = self.max_connections.saturating_sub(self.pool_size);

        write_with_labels(
            body,
            "athena_pg_pool_connection_utilization_ratio",
            labels,
            format!("{utilization:.6}"),
        );
        write_with_labels(
            body,
            "athena_pg_pool_idle_ratio",
            labels,
            format!("{idle_ratio:.6}"),
        );
        write_with_labels(
            body,
            "athena_pg_pool_saturation_ratio",
            labels,
            format!("{saturation:.6}"),
        );
        write_with_labels(body, "athena_pg_pool_headroom", labels, headroom);

        let age_seconds: f64 =
            (Utc::now() - self.recorded_at).num_milliseconds().max(0) as f64 / 1000.0;
        write_with_labels(
            body,
            "athena_pg_pool_snapshot_age_seconds",
            labels,
            format!("{age_seconds:.6}"),
        );
    }

    /// Emit a zeroed block for a registered client whose pool is absent.
    ///
    /// Keeps label cardinality stable across scrapes so Prometheus doesn't
    /// see gauges disappear when a pool is torn down.
    pub fn write_prometheus_absent(body: &mut String, labels: &str) {
        write_with_labels(body, "athena_pg_pool_up", labels, 0);
        write_with_labels(body, "athena_pg_pool_closed", labels, 1);
        write_with_labels(
            body,
            "athena_pg_pool_connections",
            &format!("{labels},state=\"size\""),
            0,
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections",
            &format!("{labels},state=\"idle\""),
            0,
        );
        write_with_labels(
            body,
            "athena_pg_pool_connections",
            &format!("{labels},state=\"active\""),
            0,
        );
        write_with_labels(body, "athena_pg_pool_max_connections", labels, 0);
        write_with_labels(
            body,
            "athena_pg_pool_connection_utilization_ratio",
            labels,
            0,
        );
        write_with_labels(body, "athena_pg_pool_idle_ratio", labels, 0);
        write_with_labels(body, "athena_pg_pool_saturation_ratio", labels, 0);
        write_with_labels(body, "athena_pg_pool_headroom", labels, 0);
        write_with_labels(body, "athena_pg_pool_snapshot_age_seconds", labels, 0);
    }

    /// Emit the `# HELP` / `# TYPE` preamble covering every per-client gauge
    /// written by [`Self::write_prometheus`] and [`Self::write_prometheus_absent`].
    ///
    /// Call once per scrape before the per-client loop; Prometheus requires
    /// each metric name's HELP/TYPE to appear at most once per exposition.
    pub fn write_prometheus_preamble(body: &mut String) {
        write_help_and_type(
            body,
            "athena_pg_pool_up",
            "Whether a registered Postgres client currently has a live SQLx pool handle.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_closed",
            "Whether a registered Postgres client SQLx pool is closed.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_connections",
            "Connection counts for registered Postgres client pools by state.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_max_connections",
            "Configured upper bound on connections for each Postgres client pool.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_connection_utilization_ratio",
            "Utilization ratio (active/size) for each Postgres client pool. Sustained values near 1.0 with growing insert latency often indicate pool starvation or slow reconnect after network blips.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_idle_ratio",
            "Idle ratio (idle/size) for each Postgres client pool.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_saturation_ratio",
            "Saturation ratio (size/max_connections) showing how close each pool is to its configured ceiling.",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_headroom",
            "Remaining provisionable connections for each pool (max_connections - size).",
            "gauge",
        );
        write_help_and_type(
            body,
            "athena_pg_pool_snapshot_age_seconds",
            "Age in seconds of the most recent ConnectionPoolSnapshot for each client (0 means just captured; large values indicate a stalled monitor).",
            "gauge",
        );
    }
}

fn write_help_and_type(body: &mut String, name: &str, help: &str, metric_type: &str) {
    let _ = writeln!(body, "# HELP {name} {help}");
    let _ = writeln!(body, "# TYPE {name} {metric_type}");
}

fn write_value(body: &mut String, name: &str, value: impl std::fmt::Display) {
    let _ = writeln!(body, "{name} {value}");
}

fn write_with_labels(body: &mut String, name: &str, labels: &str, value: impl std::fmt::Display) {
    let _ = writeln!(body, "{name}{{{labels}}} {value}");
}