Skip to main content

dr_metrix_postgres/
metrics.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use prometheus::*;
5use serde_json::Value;
6
7use dr_metrix_core::collector::CollectorConfig;
8use dr_metrix_core::error::Result;
9use dr_metrix_core::MetricsCollector;
10
11use crate::pool::PoolAdapter;
12
13const SQL_DB_STATS: &str = "
14SELECT datname, numbackends, xact_commit, xact_rollback,
15       blks_hit, blks_read, pg_database_size(datname) AS size_bytes
16FROM pg_stat_database WHERE datname IS NOT NULL";
17
18const SQL_TABLE_STATS: &str = "
19SELECT schemaname, relname, seq_scan, idx_scan, n_live_tup, n_dead_tup
20FROM pg_stat_user_tables";
21
22const SQL_LOCKS: &str = "
23SELECT mode, COUNT(*) AS cnt FROM pg_locks WHERE granted = true GROUP BY mode";
24
25pub struct PostgresMetrics<P: PoolAdapter> {
26    pool: Arc<P>,
27
28    // Pool gauges
29    pool_active: IntGauge,
30    pool_idle: IntGauge,
31    pool_max: IntGauge,
32
33    // pg_stat_database
34    db_numbackends: IntGaugeVec,
35    db_xact_commit: IntGaugeVec,
36    db_xact_rollback: IntGaugeVec,
37    db_blks_hit: IntGaugeVec,
38    db_blks_read: IntGaugeVec,
39    db_size_bytes: IntGaugeVec,
40
41    // pg_stat_user_tables
42    table_seq_scan: IntGaugeVec,
43    table_idx_scan: IntGaugeVec,
44    table_live_tup: IntGaugeVec,
45    table_dead_tup: IntGaugeVec,
46
47    // pg_locks
48    lock_count: IntGaugeVec,
49
50    // App-level
51    pub query_duration_seconds: HistogramVec,
52    pub rows_returned: HistogramVec,
53}
54
55// Manual Clone so we don't require P: Clone
56impl<P: PoolAdapter> Clone for PostgresMetrics<P> {
57    fn clone(&self) -> Self {
58        Self {
59            pool: self.pool.clone(),
60            pool_active: self.pool_active.clone(),
61            pool_idle: self.pool_idle.clone(),
62            pool_max: self.pool_max.clone(),
63            db_numbackends: self.db_numbackends.clone(),
64            db_xact_commit: self.db_xact_commit.clone(),
65            db_xact_rollback: self.db_xact_rollback.clone(),
66            db_blks_hit: self.db_blks_hit.clone(),
67            db_blks_read: self.db_blks_read.clone(),
68            db_size_bytes: self.db_size_bytes.clone(),
69            table_seq_scan: self.table_seq_scan.clone(),
70            table_idx_scan: self.table_idx_scan.clone(),
71            table_live_tup: self.table_live_tup.clone(),
72            table_dead_tup: self.table_dead_tup.clone(),
73            lock_count: self.lock_count.clone(),
74            query_duration_seconds: self.query_duration_seconds.clone(),
75            rows_returned: self.rows_returned.clone(),
76        }
77    }
78}
79
80impl<P: PoolAdapter> PostgresMetrics<P> {
81    pub fn new(pool: P, config: CollectorConfig) -> Result<Self> {
82        let ns = &config.namespace;
83        let sub = "postgres";
84
85        macro_rules! gauge {
86            ($name:expr, $help:expr) => {{
87                let mut opts = Opts::new($name, $help).subsystem(sub);
88                if !ns.is_empty() {
89                    opts = opts.namespace(ns.as_str());
90                }
91                IntGauge::with_opts(opts)?
92            }};
93        }
94
95        macro_rules! gauge_vec {
96            ($name:expr, $help:expr, $labels:expr) => {{
97                let mut opts = Opts::new($name, $help).subsystem(sub);
98                if !ns.is_empty() {
99                    opts = opts.namespace(ns.as_str());
100                }
101                IntGaugeVec::new(opts, $labels)?
102            }};
103        }
104
105        macro_rules! histogram_vec {
106            ($name:expr, $help:expr, $labels:expr, $buckets:expr) => {{
107                let mut opts = HistogramOpts::new($name, $help)
108                    .subsystem(sub)
109                    .buckets($buckets);
110                if !ns.is_empty() {
111                    opts = opts.namespace(ns.as_str());
112                }
113                HistogramVec::new(opts, $labels)?
114            }};
115        }
116
117        Ok(Self {
118            pool: Arc::new(pool),
119            pool_active: gauge!("pool_active", "Active pool connections"),
120            pool_idle: gauge!("pool_idle", "Idle pool connections"),
121            pool_max: gauge!("pool_max", "Maximum pool size"),
122            db_numbackends: gauge_vec!(
123                "db_numbackends",
124                "Active backends per database",
125                &["database"]
126            ),
127            db_xact_commit: gauge_vec!(
128                "db_xact_commit_total",
129                "Committed transactions (snapshot)",
130                &["database"]
131            ),
132            db_xact_rollback: gauge_vec!(
133                "db_xact_rollback_total",
134                "Rolled-back transactions (snapshot)",
135                &["database"]
136            ),
137            db_blks_hit: gauge_vec!(
138                "db_blks_hit_total",
139                "Buffer hits (snapshot)",
140                &["database"]
141            ),
142            db_blks_read: gauge_vec!(
143                "db_blks_read_total",
144                "Disk blocks read (snapshot)",
145                &["database"]
146            ),
147            db_size_bytes: gauge_vec!("db_size_bytes", "Database size in bytes", &["database"]),
148            table_seq_scan: gauge_vec!(
149                "table_seq_scan_total",
150                "Sequential scans (snapshot)",
151                &["schema", "table"]
152            ),
153            table_idx_scan: gauge_vec!(
154                "table_idx_scan_total",
155                "Index scans (snapshot)",
156                &["schema", "table"]
157            ),
158            table_live_tup: gauge_vec!(
159                "table_live_tup",
160                "Estimated live rows",
161                &["schema", "table"]
162            ),
163            table_dead_tup: gauge_vec!(
164                "table_dead_tup",
165                "Estimated dead rows",
166                &["schema", "table"]
167            ),
168            lock_count: gauge_vec!("lock_count", "Active locks by mode", &["mode"]),
169            query_duration_seconds: histogram_vec!(
170                "query_duration_seconds",
171                "Application query duration",
172                &["query"],
173                dr_metrix_core::DEFAULT_QUERY_BUCKETS.to_vec()
174            ),
175            rows_returned: histogram_vec!(
176                "rows_returned",
177                "Rows returned per application query",
178                &["query"],
179                vec![1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0]
180            ),
181        })
182    }
183
184    pub fn observe_query(&self, name: &str, duration_secs: f64, row_count: f64) {
185        self.query_duration_seconds
186            .with_label_values(&[name])
187            .observe(duration_secs);
188        self.rows_returned
189            .with_label_values(&[name])
190            .observe(row_count);
191    }
192}
193
194fn get_i64(row: &serde_json::Map<String, Value>, key: &str) -> i64 {
195    row.get(key)
196        .and_then(|v| v.as_i64().or_else(|| v.as_f64().map(|f| f as i64)))
197        .unwrap_or(0)
198}
199
200fn get_str<'a>(row: &'a serde_json::Map<String, Value>, key: &str) -> &'a str {
201    row.get(key).and_then(|v| v.as_str()).unwrap_or("")
202}
203
204#[async_trait]
205impl<P: PoolAdapter> MetricsCollector for PostgresMetrics<P> {
206    fn name(&self) -> &'static str {
207        "postgres"
208    }
209
210    fn register(&self, registry: &Registry) -> Result<()> {
211        registry.register(Box::new(self.pool_active.clone()))?;
212        registry.register(Box::new(self.pool_idle.clone()))?;
213        registry.register(Box::new(self.pool_max.clone()))?;
214        registry.register(Box::new(self.db_numbackends.clone()))?;
215        registry.register(Box::new(self.db_xact_commit.clone()))?;
216        registry.register(Box::new(self.db_xact_rollback.clone()))?;
217        registry.register(Box::new(self.db_blks_hit.clone()))?;
218        registry.register(Box::new(self.db_blks_read.clone()))?;
219        registry.register(Box::new(self.db_size_bytes.clone()))?;
220        registry.register(Box::new(self.table_seq_scan.clone()))?;
221        registry.register(Box::new(self.table_idx_scan.clone()))?;
222        registry.register(Box::new(self.table_live_tup.clone()))?;
223        registry.register(Box::new(self.table_dead_tup.clone()))?;
224        registry.register(Box::new(self.lock_count.clone()))?;
225        registry.register(Box::new(self.query_duration_seconds.clone()))?;
226        registry.register(Box::new(self.rows_returned.clone()))?;
227        Ok(())
228    }
229
230    async fn collect(&self) -> Result<()> {
231        // Pool status
232        let status = self.pool.pool_status();
233        self.pool_active.set(status.in_use as i64);
234        self.pool_idle.set(status.available as i64);
235        self.pool_max.set(status.max_size as i64);
236
237        // pg_stat_database
238        let db_rows = self.pool.query_json(SQL_DB_STATS).await?;
239        for row in &db_rows {
240            let db = get_str(row, "datname");
241            self.db_numbackends
242                .with_label_values(&[db])
243                .set(get_i64(row, "numbackends"));
244            self.db_xact_commit
245                .with_label_values(&[db])
246                .set(get_i64(row, "xact_commit"));
247            self.db_xact_rollback
248                .with_label_values(&[db])
249                .set(get_i64(row, "xact_rollback"));
250            self.db_blks_hit
251                .with_label_values(&[db])
252                .set(get_i64(row, "blks_hit"));
253            self.db_blks_read
254                .with_label_values(&[db])
255                .set(get_i64(row, "blks_read"));
256            self.db_size_bytes
257                .with_label_values(&[db])
258                .set(get_i64(row, "size_bytes"));
259        }
260
261        // pg_stat_user_tables
262        let table_rows = self.pool.query_json(SQL_TABLE_STATS).await?;
263        for row in &table_rows {
264            let schema = get_str(row, "schemaname");
265            let table = get_str(row, "relname");
266            self.table_seq_scan
267                .with_label_values(&[schema, table])
268                .set(get_i64(row, "seq_scan"));
269            self.table_idx_scan
270                .with_label_values(&[schema, table])
271                .set(get_i64(row, "idx_scan"));
272            self.table_live_tup
273                .with_label_values(&[schema, table])
274                .set(get_i64(row, "n_live_tup"));
275            self.table_dead_tup
276                .with_label_values(&[schema, table])
277                .set(get_i64(row, "n_dead_tup"));
278        }
279
280        // pg_locks
281        let lock_rows = self.pool.query_json(SQL_LOCKS).await?;
282        for row in &lock_rows {
283            let mode = get_str(row, "mode");
284            self.lock_count
285                .with_label_values(&[mode])
286                .set(get_i64(row, "cnt"));
287        }
288
289        Ok(())
290    }
291}