1use std::sync::Arc;
2
3use async_trait::async_trait;
4use prometheus::*;
5use serde_json::Value;
6
7use dr_metrix_core::collector::CollectorConfig;
8use dr_metrix_core::error::Result;
9use dr_metrix_core::MetricsCollector;
10
11use crate::pool::PoolAdapter;
12
13const SQL_DB_STATS: &str = "
14SELECT datname, numbackends, xact_commit, xact_rollback,
15 blks_hit, blks_read, pg_database_size(datname) AS size_bytes
16FROM pg_stat_database WHERE datname IS NOT NULL";
17
18const SQL_TABLE_STATS: &str = "
19SELECT schemaname, relname, seq_scan, idx_scan, n_live_tup, n_dead_tup
20FROM pg_stat_user_tables";
21
22const SQL_LOCKS: &str = "
23SELECT mode, COUNT(*) AS cnt FROM pg_locks WHERE granted = true GROUP BY mode";
24
25pub struct PostgresMetrics<P: PoolAdapter> {
26 pool: Arc<P>,
27
28 pool_active: IntGauge,
30 pool_idle: IntGauge,
31 pool_max: IntGauge,
32
33 db_numbackends: IntGaugeVec,
35 db_xact_commit: IntGaugeVec,
36 db_xact_rollback: IntGaugeVec,
37 db_blks_hit: IntGaugeVec,
38 db_blks_read: IntGaugeVec,
39 db_size_bytes: IntGaugeVec,
40
41 table_seq_scan: IntGaugeVec,
43 table_idx_scan: IntGaugeVec,
44 table_live_tup: IntGaugeVec,
45 table_dead_tup: IntGaugeVec,
46
47 lock_count: IntGaugeVec,
49
50 pub query_duration_seconds: HistogramVec,
52 pub rows_returned: HistogramVec,
53}
54
55impl<P: PoolAdapter> Clone for PostgresMetrics<P> {
57 fn clone(&self) -> Self {
58 Self {
59 pool: self.pool.clone(),
60 pool_active: self.pool_active.clone(),
61 pool_idle: self.pool_idle.clone(),
62 pool_max: self.pool_max.clone(),
63 db_numbackends: self.db_numbackends.clone(),
64 db_xact_commit: self.db_xact_commit.clone(),
65 db_xact_rollback: self.db_xact_rollback.clone(),
66 db_blks_hit: self.db_blks_hit.clone(),
67 db_blks_read: self.db_blks_read.clone(),
68 db_size_bytes: self.db_size_bytes.clone(),
69 table_seq_scan: self.table_seq_scan.clone(),
70 table_idx_scan: self.table_idx_scan.clone(),
71 table_live_tup: self.table_live_tup.clone(),
72 table_dead_tup: self.table_dead_tup.clone(),
73 lock_count: self.lock_count.clone(),
74 query_duration_seconds: self.query_duration_seconds.clone(),
75 rows_returned: self.rows_returned.clone(),
76 }
77 }
78}
79
80impl<P: PoolAdapter> PostgresMetrics<P> {
81 pub fn new(pool: P, config: CollectorConfig) -> Result<Self> {
82 let ns = &config.namespace;
83 let sub = "postgres";
84
85 macro_rules! gauge {
86 ($name:expr, $help:expr) => {{
87 let mut opts = Opts::new($name, $help).subsystem(sub);
88 if !ns.is_empty() {
89 opts = opts.namespace(ns.as_str());
90 }
91 IntGauge::with_opts(opts)?
92 }};
93 }
94
95 macro_rules! gauge_vec {
96 ($name:expr, $help:expr, $labels:expr) => {{
97 let mut opts = Opts::new($name, $help).subsystem(sub);
98 if !ns.is_empty() {
99 opts = opts.namespace(ns.as_str());
100 }
101 IntGaugeVec::new(opts, $labels)?
102 }};
103 }
104
105 macro_rules! histogram_vec {
106 ($name:expr, $help:expr, $labels:expr, $buckets:expr) => {{
107 let mut opts = HistogramOpts::new($name, $help)
108 .subsystem(sub)
109 .buckets($buckets);
110 if !ns.is_empty() {
111 opts = opts.namespace(ns.as_str());
112 }
113 HistogramVec::new(opts, $labels)?
114 }};
115 }
116
117 Ok(Self {
118 pool: Arc::new(pool),
119 pool_active: gauge!("pool_active", "Active pool connections"),
120 pool_idle: gauge!("pool_idle", "Idle pool connections"),
121 pool_max: gauge!("pool_max", "Maximum pool size"),
122 db_numbackends: gauge_vec!(
123 "db_numbackends",
124 "Active backends per database",
125 &["database"]
126 ),
127 db_xact_commit: gauge_vec!(
128 "db_xact_commit_total",
129 "Committed transactions (snapshot)",
130 &["database"]
131 ),
132 db_xact_rollback: gauge_vec!(
133 "db_xact_rollback_total",
134 "Rolled-back transactions (snapshot)",
135 &["database"]
136 ),
137 db_blks_hit: gauge_vec!(
138 "db_blks_hit_total",
139 "Buffer hits (snapshot)",
140 &["database"]
141 ),
142 db_blks_read: gauge_vec!(
143 "db_blks_read_total",
144 "Disk blocks read (snapshot)",
145 &["database"]
146 ),
147 db_size_bytes: gauge_vec!("db_size_bytes", "Database size in bytes", &["database"]),
148 table_seq_scan: gauge_vec!(
149 "table_seq_scan_total",
150 "Sequential scans (snapshot)",
151 &["schema", "table"]
152 ),
153 table_idx_scan: gauge_vec!(
154 "table_idx_scan_total",
155 "Index scans (snapshot)",
156 &["schema", "table"]
157 ),
158 table_live_tup: gauge_vec!(
159 "table_live_tup",
160 "Estimated live rows",
161 &["schema", "table"]
162 ),
163 table_dead_tup: gauge_vec!(
164 "table_dead_tup",
165 "Estimated dead rows",
166 &["schema", "table"]
167 ),
168 lock_count: gauge_vec!("lock_count", "Active locks by mode", &["mode"]),
169 query_duration_seconds: histogram_vec!(
170 "query_duration_seconds",
171 "Application query duration",
172 &["query"],
173 dr_metrix_core::DEFAULT_QUERY_BUCKETS.to_vec()
174 ),
175 rows_returned: histogram_vec!(
176 "rows_returned",
177 "Rows returned per application query",
178 &["query"],
179 vec![1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0]
180 ),
181 })
182 }
183
184 pub fn observe_query(&self, name: &str, duration_secs: f64, row_count: f64) {
185 self.query_duration_seconds
186 .with_label_values(&[name])
187 .observe(duration_secs);
188 self.rows_returned
189 .with_label_values(&[name])
190 .observe(row_count);
191 }
192}
193
194fn get_i64(row: &serde_json::Map<String, Value>, key: &str) -> i64 {
195 row.get(key)
196 .and_then(|v| v.as_i64().or_else(|| v.as_f64().map(|f| f as i64)))
197 .unwrap_or(0)
198}
199
200fn get_str<'a>(row: &'a serde_json::Map<String, Value>, key: &str) -> &'a str {
201 row.get(key).and_then(|v| v.as_str()).unwrap_or("")
202}
203
204#[async_trait]
205impl<P: PoolAdapter> MetricsCollector for PostgresMetrics<P> {
206 fn name(&self) -> &'static str {
207 "postgres"
208 }
209
210 fn register(&self, registry: &Registry) -> Result<()> {
211 registry.register(Box::new(self.pool_active.clone()))?;
212 registry.register(Box::new(self.pool_idle.clone()))?;
213 registry.register(Box::new(self.pool_max.clone()))?;
214 registry.register(Box::new(self.db_numbackends.clone()))?;
215 registry.register(Box::new(self.db_xact_commit.clone()))?;
216 registry.register(Box::new(self.db_xact_rollback.clone()))?;
217 registry.register(Box::new(self.db_blks_hit.clone()))?;
218 registry.register(Box::new(self.db_blks_read.clone()))?;
219 registry.register(Box::new(self.db_size_bytes.clone()))?;
220 registry.register(Box::new(self.table_seq_scan.clone()))?;
221 registry.register(Box::new(self.table_idx_scan.clone()))?;
222 registry.register(Box::new(self.table_live_tup.clone()))?;
223 registry.register(Box::new(self.table_dead_tup.clone()))?;
224 registry.register(Box::new(self.lock_count.clone()))?;
225 registry.register(Box::new(self.query_duration_seconds.clone()))?;
226 registry.register(Box::new(self.rows_returned.clone()))?;
227 Ok(())
228 }
229
230 async fn collect(&self) -> Result<()> {
231 let status = self.pool.pool_status();
233 self.pool_active.set(status.in_use as i64);
234 self.pool_idle.set(status.available as i64);
235 self.pool_max.set(status.max_size as i64);
236
237 let db_rows = self.pool.query_json(SQL_DB_STATS).await?;
239 for row in &db_rows {
240 let db = get_str(row, "datname");
241 self.db_numbackends
242 .with_label_values(&[db])
243 .set(get_i64(row, "numbackends"));
244 self.db_xact_commit
245 .with_label_values(&[db])
246 .set(get_i64(row, "xact_commit"));
247 self.db_xact_rollback
248 .with_label_values(&[db])
249 .set(get_i64(row, "xact_rollback"));
250 self.db_blks_hit
251 .with_label_values(&[db])
252 .set(get_i64(row, "blks_hit"));
253 self.db_blks_read
254 .with_label_values(&[db])
255 .set(get_i64(row, "blks_read"));
256 self.db_size_bytes
257 .with_label_values(&[db])
258 .set(get_i64(row, "size_bytes"));
259 }
260
261 let table_rows = self.pool.query_json(SQL_TABLE_STATS).await?;
263 for row in &table_rows {
264 let schema = get_str(row, "schemaname");
265 let table = get_str(row, "relname");
266 self.table_seq_scan
267 .with_label_values(&[schema, table])
268 .set(get_i64(row, "seq_scan"));
269 self.table_idx_scan
270 .with_label_values(&[schema, table])
271 .set(get_i64(row, "idx_scan"));
272 self.table_live_tup
273 .with_label_values(&[schema, table])
274 .set(get_i64(row, "n_live_tup"));
275 self.table_dead_tup
276 .with_label_values(&[schema, table])
277 .set(get_i64(row, "n_dead_tup"));
278 }
279
280 let lock_rows = self.pool.query_json(SQL_LOCKS).await?;
282 for row in &lock_rows {
283 let mode = get_str(row, "mode");
284 self.lock_count
285 .with_label_values(&[mode])
286 .set(get_i64(row, "cnt"));
287 }
288
289 Ok(())
290 }
291}