1use std::{
2 collections::HashMap,
3 time::Duration,
4 {env, fmt},
5};
6pub mod diagnose;
7pub mod queries;
8
9#[cfg(feature = "web")]
10pub mod web;
11
12pub use queries::{
13 all_locks::AllLocks,
14 bloat::Bloat,
15 blocking::Blocking,
16 buffercache_stats::BuffercacheStats,
17 buffercache_usage::BuffercacheUsage,
18 cache_hit::CacheHit,
19 calls::Calls,
20 connections::Connections,
21 db_settings::DbSettings,
22 duplicate_indexes::DuplicateIndexes,
23 extensions::Extensions,
24 index_cache_hit::IndexCacheHit,
25 index_scans::IndexScans,
26 index_size::IndexSize,
27 index_usage::IndexUsage,
28 indexes::Indexes,
29 locks::Locks,
30 long_running_queries::LongRunningQueries,
31 mandelbrot::Mandelbrot,
32 null_indexes::NullIndexes,
33 outliers::Outliers,
34 records_rank::RecordsRank,
35 seq_scans::SeqScans,
36 shared::{get_default_schema, Query},
37 ssl_used::SslUsed,
38 table_cache_hit::TableCacheHit,
39 table_index_scans::TableIndexScans,
40 table_indexes_size::TableIndexesSize,
41 table_size::TableSize,
42 tables::Tables,
43 total_index_size::TotalIndexSize,
44 total_table_size::TotalTableSize,
45 unused_indexes::UnusedIndexes,
46 vacuum_stats::VacuumStats,
47};
48use semver::Version;
49use sqlx::{postgres::PgPoolOptions, Pool, Postgres, Row};
50
51#[macro_use]
52extern crate prettytable;
53use prettytable::{Cell, Row as TableRow, Table};
54
55pub fn render_table<T: Query>(items: Vec<T>) {
57 let mut table = Table::new();
58 table.add_row(T::headers());
59
60 let columns_count = T::headers().len();
61
62 for item in items {
63 table.add_row(item.to_row());
64 }
65 table.set_titles(TableRow::new(vec![
66 Cell::new(T::description().as_str()).style_spec(format!("H{}", columns_count).as_str())
67 ]));
68 table.printstd();
69}
70
71pub async fn bloat(pool: &Pool<Postgres>) -> Result<Vec<Bloat>, PgExtrasError> {
73 get_rows(None, pool).await
74}
75
76pub async fn blocking(
78 limit: Option<String>,
79 pool: &Pool<Postgres>,
80) -> Result<Vec<Blocking>, PgExtrasError> {
81 get_rows(Some(limit_params(limit)), pool).await
82}
83
84pub async fn pg_pool() -> Result<Pool<Postgres>, PgExtrasError> {
88 match PgPoolOptions::new()
89 .max_connections(5)
90 .acquire_timeout(Duration::from_secs(10))
91 .connect(db_url()?.as_str())
92 .await
93 {
94 Ok(pool) => Ok(pool),
95 Err(e) => Err(PgExtrasError::DbConnectionError(format!("{}", e))),
96 }
97}
98
99pub async fn calls(
101 limit: Option<String>,
102 pool: &Pool<Postgres>,
103) -> Result<Vec<Calls>, PgExtrasError> {
104 get_rows(Some(limit_params(limit)), pool).await
105}
106
107pub async fn extensions(pool: &Pool<Postgres>) -> Result<Vec<Extensions>, PgExtrasError> {
109 get_rows(None, pool).await
110}
111
112pub async fn table_cache_hit(pool: &Pool<Postgres>) -> Result<Vec<TableCacheHit>, PgExtrasError> {
114 get_rows(None, pool).await
115}
116
117pub async fn tables(
119 schema: Option<String>,
120 pool: &Pool<Postgres>,
121) -> Result<Vec<Tables>, PgExtrasError> {
122 get_rows(Some(schema_params(schema)), pool).await
123}
124
125pub async fn index_cache_hit(
127 schema: Option<String>,
128 pool: &Pool<Postgres>,
129) -> Result<Vec<IndexCacheHit>, PgExtrasError> {
130 get_rows(Some(schema_params(schema)), pool).await
131}
132
133pub async fn indexes(pool: &Pool<Postgres>) -> Result<Vec<Indexes>, PgExtrasError> {
135 get_rows(None, pool).await
136}
137
138pub async fn index_size(pool: &Pool<Postgres>) -> Result<Vec<IndexSize>, PgExtrasError> {
140 get_rows(None, pool).await
141}
142
143pub async fn index_usage(
145 schema: Option<String>,
146 pool: &Pool<Postgres>,
147) -> Result<Vec<IndexUsage>, PgExtrasError> {
148 get_rows(Some(schema_params(schema)), pool).await
149}
150
151pub async fn index_scans(
153 schema: Option<String>,
154 pool: &Pool<Postgres>,
155) -> Result<Vec<IndexScans>, PgExtrasError> {
156 get_rows(Some(schema_params(schema)), pool).await
157}
158
159pub async fn null_indexes(
161 min_relation_size_mb: Option<String>,
162 pool: &Pool<Postgres>,
163) -> Result<Vec<NullIndexes>, PgExtrasError> {
164 let min_relation_size_mb = min_relation_size_mb.unwrap_or("10".to_string());
165
166 let params: HashMap<String, String> = [(
167 "min_relation_size_mb".to_string(),
168 min_relation_size_mb.to_string(),
169 )]
170 .iter()
171 .cloned()
172 .collect();
173 get_rows(Some(params), pool).await
174}
175
176pub async fn locks(pool: &Pool<Postgres>) -> Result<Vec<Locks>, PgExtrasError> {
178 get_rows(None, pool).await
179}
180
181pub async fn all_locks(pool: &Pool<Postgres>) -> Result<Vec<AllLocks>, PgExtrasError> {
183 get_rows(None, pool).await
184}
185
186pub async fn long_running_queries(
188 pool: &Pool<Postgres>,
189) -> Result<Vec<LongRunningQueries>, PgExtrasError> {
190 get_rows(None, pool).await
191}
192
193pub async fn mandelbrot(pool: &Pool<Postgres>) -> Result<Vec<Mandelbrot>, PgExtrasError> {
195 get_rows(None, pool).await
196}
197
198pub async fn outliers(pool: &Pool<Postgres>) -> Result<Vec<Outliers>, PgExtrasError> {
200 get_rows(None, pool).await
201}
202
203pub async fn records_rank(
205 schema: Option<String>,
206 pool: &Pool<Postgres>,
207) -> Result<Vec<RecordsRank>, PgExtrasError> {
208 get_rows(Some(schema_params(schema)), pool).await
209}
210
211pub async fn seq_scans(
213 schema: Option<String>,
214 pool: &Pool<Postgres>,
215) -> Result<Vec<SeqScans>, PgExtrasError> {
216 get_rows(Some(schema_params(schema)), pool).await
217}
218
219pub async fn table_index_scans(
221 schema: Option<String>,
222 pool: &Pool<Postgres>,
223) -> Result<Vec<TableIndexScans>, PgExtrasError> {
224 get_rows(Some(schema_params(schema)), pool).await
225}
226
227pub async fn table_indexes_size(
229 schema: Option<String>,
230 pool: &Pool<Postgres>,
231) -> Result<Vec<TableIndexesSize>, PgExtrasError> {
232 get_rows(Some(schema_params(schema)), pool).await
233}
234
235pub async fn table_size(pool: &Pool<Postgres>) -> Result<Vec<TableSize>, PgExtrasError> {
237 get_rows(None, pool).await
238}
239
240pub async fn total_index_size(pool: &Pool<Postgres>) -> Result<Vec<TotalIndexSize>, PgExtrasError> {
242 get_rows(None, pool).await
243}
244
245pub async fn total_table_size(pool: &Pool<Postgres>) -> Result<Vec<TotalTableSize>, PgExtrasError> {
247 get_rows(None, pool).await
248}
249
250pub async fn unused_indexes(
252 schema: Option<String>,
253 pool: &Pool<Postgres>,
254) -> Result<Vec<UnusedIndexes>, PgExtrasError> {
255 get_rows(Some(schema_params(schema)), pool).await
256}
257
258pub async fn duplicate_indexes(
260 pool: &Pool<Postgres>,
261) -> Result<Vec<DuplicateIndexes>, PgExtrasError> {
262 get_rows(None, pool).await
263}
264
265pub async fn vacuum_stats(pool: &Pool<Postgres>) -> Result<Vec<VacuumStats>, PgExtrasError> {
267 get_rows(None, pool).await
268}
269
270pub async fn buffercache_stats(
272 pool: &Pool<Postgres>,
273) -> Result<Vec<BuffercacheStats>, PgExtrasError> {
274 get_rows(None, pool).await
275}
276
277pub async fn buffercache_usage(
279 pool: &Pool<Postgres>,
280) -> Result<Vec<BuffercacheUsage>, PgExtrasError> {
281 get_rows(None, pool).await
282}
283
284pub async fn ssl_used(pool: &Pool<Postgres>) -> Result<Vec<SslUsed>, PgExtrasError> {
286 get_rows(None, pool).await
287}
288
289pub async fn connections(pool: &Pool<Postgres>) -> Result<Vec<Connections>, PgExtrasError> {
291 get_rows(None, pool).await
292}
293
294pub async fn cache_hit(
296 schema: Option<String>,
297 pool: &Pool<Postgres>,
298) -> Result<Vec<CacheHit>, PgExtrasError> {
299 get_rows(Some(schema_params(schema)), pool).await
300}
301
302pub async fn db_settings(pool: &Pool<Postgres>) -> Result<Vec<DbSettings>, PgExtrasError> {
304 get_rows(None, pool).await
305}
306
307pub async fn diagnose(pool: &Pool<Postgres>) -> Result<Vec<CheckResult>, PgExtrasError> {
309 run_diagnose(pool).await
310}
311
312#[derive(Debug, Clone)]
313#[non_exhaustive]
314pub enum PgExtrasError {
315 MissingConfigVars(),
316 DbConnectionError(String),
317 Other(String),
318}
319
320impl fmt::Display for PgExtrasError {
321 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
322 let msg = match self {
323 Self::MissingConfigVars() => {
324 "Both $DATABASE_URL and $PG_EXTRAS_DATABASE_URL are not set."
325 }
326 Self::DbConnectionError(e) => &format!("Cannot connect to database: '{}'", e),
327 Self::Other(e) => &e.to_string(),
328 };
329
330 write!(f, "{}", msg)
331 }
332}
333
334impl std::error::Error for PgExtrasError {}
335
336use crate::diagnose::run::{run_diagnose, CheckResult};
337use lazy_static::lazy_static;
338
339lazy_static! {
340 pub static ref NEW_PG_STAT_STATEMENTS: Version = Version::parse("1.8.0").unwrap();
341 pub static ref PG_STAT_STATEMENTS_17: Version = semver::Version::parse("1.11.0").unwrap();
342}
343
344#[derive(Debug)]
345pub enum PgStatsVersion {
346 Legacy,
347 Standard,
348 Pg17,
349}
350
351async fn get_rows<T: Query>(
352 params: Option<HashMap<String, String>>,
353 pool: &Pool<Postgres>,
354) -> Result<Vec<T>, PgExtrasError> {
355 let pg_statements_query =
356 "select installed_version from pg_available_extensions where name='pg_stat_statements'";
357
358 let pg_statements_version = match sqlx::query(pg_statements_query).fetch_one(pool).await {
359 Ok(row) => row
360 .try_get::<String, _>("installed_version")
361 .unwrap_or_default(),
362 Err(_) => "".to_string(),
363 };
364
365 let default_version = NEW_PG_STAT_STATEMENTS.clone();
366 let pg_statements_version = format!("{}.0", pg_statements_version);
367 let pg_statements_version =
368 Version::parse(&pg_statements_version).unwrap_or(default_version.clone());
369
370 let pg_statements_version = if pg_statements_version < default_version {
371 PgStatsVersion::Legacy
372 } else if pg_statements_version >= *PG_STAT_STATEMENTS_17 {
373 PgStatsVersion::Pg17
374 } else {
375 PgStatsVersion::Standard
376 };
377
378 let mut query = T::read_file(Some(pg_statements_version));
379
380 if let Some(params) = params {
381 for (key, value) in ¶ms {
382 query = query.replace(&format!("%{{{}}}", key), value.as_str());
383 }
384 }
385
386 Ok(match sqlx::query(&query).fetch_all(pool).await {
387 Ok(rows) => rows.iter().map(T::new).collect(),
388 Err(e) => return Err(PgExtrasError::Other(format!("{}", e))),
389 })
390}
391
392fn db_url() -> Result<String, PgExtrasError> {
393 env::var("PG_EXTRAS_DATABASE_URL")
394 .or_else(|_| env::var("DATABASE_URL"))
395 .map_err(|_| PgExtrasError::MissingConfigVars())
396}
397
398fn schema_params(schema_name: Option<String>) -> HashMap<String, String> {
399 let schema_name = schema_name.unwrap_or(get_default_schema());
400 [("schema".to_string(), schema_name.to_string())]
401 .iter()
402 .cloned()
403 .collect()
404}
405
406fn limit_params(limit: Option<String>) -> HashMap<String, String> {
407 let limit = limit.unwrap_or("10".to_string());
408 [("limit".to_string(), limit.to_string())]
409 .iter()
410 .cloned()
411 .collect()
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417 use crate::diagnose::report::render_diagnose_report;
418
419 async fn setup() -> Result<(), Box<dyn std::error::Error>> {
420 let port = match env::var("PG_VERSION").expect("PG_VERSION not set").as_str() {
421 "12" => "5432",
422 "13" => "5433",
423 "14" => "5434",
424 "15" => "5435",
425 "16" => "5436",
426 "17" => "5437",
427 _ => "5432",
428 };
429
430 env::set_var(
431 "PG_EXTRAS_DATABASE_URL",
432 format!(
433 "postgres://postgres:secret@localhost:{}/pg-extras-rs-test",
434 port
435 ),
436 );
437
438 let pool = PgPoolOptions::new()
439 .max_connections(5)
440 .connect(db_url()?.as_str())
441 .await?;
442
443 for extension in ["sslinfo", "pg_stat_statements", "pg_buffercache"] {
444 let query = format!("CREATE EXTENSION IF NOT EXISTS {};", extension);
445 sqlx::query(&query).execute(&pool).await?;
446 }
447
448 Ok(())
449 }
450
451 #[tokio::test]
452 async fn it_works() -> Result<(), Box<dyn std::error::Error>> {
453 setup().await?;
454
455 let pool = pg_pool().await?;
456
457 render_table(cache_hit(None, &pool).await?);
458 render_table(bloat(&pool).await?);
459 render_table(blocking(None, &pool).await?);
460 render_table(calls(None, &pool).await?);
461 render_table(extensions(&pool).await?);
462 render_table(table_cache_hit(&pool).await?);
463 render_table(seq_scans(None, &pool).await?);
464 render_table(table_index_scans(None, &pool).await?);
465 render_table(table_indexes_size(None, &pool).await?);
466 render_table(tables(None, &pool).await?);
467 render_table(index_cache_hit(None, &pool).await?);
468 render_table(indexes(&pool).await?);
469 render_table(index_size(&pool).await?);
470 render_table(index_usage(None, &pool).await?);
471 render_table(index_scans(None, &pool).await?);
472 render_table(null_indexes(None, &pool).await?);
473 render_table(locks(&pool).await?);
474 render_table(all_locks(&pool).await?);
475 render_table(long_running_queries(&pool).await?);
476 render_table(mandelbrot(&pool).await?);
477 render_table(outliers(&pool).await?);
478 render_table(records_rank(None, &pool).await?);
479 render_table(table_size(&pool).await?);
480 render_table(total_index_size(&pool).await?);
481 render_table(total_table_size(&pool).await?);
482 render_table(unused_indexes(None, &pool).await?);
483 render_table(duplicate_indexes(&pool).await?);
484 render_table(vacuum_stats(&pool).await?);
485 render_table(buffercache_stats(&pool).await?);
486 render_table(buffercache_usage(&pool).await?);
487 render_table(ssl_used(&pool).await?);
488 render_table(connections(&pool).await?);
489 render_table(db_settings(&pool).await?);
490 render_diagnose_report(diagnose(&pool).await?);
491
492 Ok(())
493 }
494
495 #[test]
496 fn normal_types() {
497 fn is_normal<T: Sized + Send + Sync + Unpin>() {}
498
499 is_normal::<NullIndexes>();
500 is_normal::<Bloat>();
501 is_normal::<Blocking>();
502 is_normal::<Calls>();
503 is_normal::<Extensions>();
504 is_normal::<TableCacheHit>();
505 is_normal::<Tables>();
506 is_normal::<IndexCacheHit>();
507 is_normal::<Indexes>();
508 is_normal::<IndexSize>();
509 is_normal::<IndexUsage>();
510 is_normal::<IndexScans>();
511 is_normal::<NullIndexes>();
512 is_normal::<Locks>();
513 is_normal::<AllLocks>();
514 is_normal::<LongRunningQueries>();
515 is_normal::<Mandelbrot>();
516 is_normal::<Outliers>();
517 is_normal::<RecordsRank>();
518 is_normal::<SeqScans>();
519 is_normal::<TableIndexScans>();
520 is_normal::<TableIndexesSize>();
521 is_normal::<TableSize>();
522 is_normal::<TotalIndexSize>();
523 is_normal::<TotalTableSize>();
524 is_normal::<UnusedIndexes>();
525 is_normal::<DuplicateIndexes>();
526 is_normal::<VacuumStats>();
527 is_normal::<DuplicateIndexes>();
528 is_normal::<BuffercacheStats>();
529 is_normal::<SslUsed>();
530 is_normal::<Connections>();
531 is_normal::<CacheHit>();
532 is_normal::<DbSettings>();
533 is_normal::<PgExtrasError>();
534 }
535}