pg_extras/
lib.rs

1use std::{
2    collections::HashMap,
3    time::Duration,
4    {env, fmt},
5};
6pub mod diagnose;
7pub mod queries;
8
9#[cfg(feature = "web")]
10pub mod web;
11
12pub use queries::{
13    all_locks::AllLocks,
14    bloat::Bloat,
15    blocking::Blocking,
16    buffercache_stats::BuffercacheStats,
17    buffercache_usage::BuffercacheUsage,
18    cache_hit::CacheHit,
19    calls::Calls,
20    connections::Connections,
21    db_settings::DbSettings,
22    duplicate_indexes::DuplicateIndexes,
23    extensions::Extensions,
24    index_cache_hit::IndexCacheHit,
25    index_scans::IndexScans,
26    index_size::IndexSize,
27    index_usage::IndexUsage,
28    indexes::Indexes,
29    locks::Locks,
30    long_running_queries::LongRunningQueries,
31    mandelbrot::Mandelbrot,
32    null_indexes::NullIndexes,
33    outliers::Outliers,
34    records_rank::RecordsRank,
35    seq_scans::SeqScans,
36    shared::{get_default_schema, Query},
37    ssl_used::SslUsed,
38    table_cache_hit::TableCacheHit,
39    table_index_scans::TableIndexScans,
40    table_indexes_size::TableIndexesSize,
41    table_size::TableSize,
42    tables::Tables,
43    total_index_size::TotalIndexSize,
44    total_table_size::TotalTableSize,
45    unused_indexes::UnusedIndexes,
46    vacuum_stats::VacuumStats,
47};
48use semver::Version;
49use sqlx::{postgres::PgPoolOptions, Pool, Postgres, Row};
50
51#[macro_use]
52extern crate prettytable;
53use prettytable::{Cell, Row as TableRow, Table};
54
55/// Renders a table to stdout for any type that implements the Query trait.
56pub fn render_table<T: Query>(items: Vec<T>) {
57    let mut table = Table::new();
58    table.add_row(T::headers());
59
60    let columns_count = T::headers().len();
61
62    for item in items {
63        table.add_row(item.to_row());
64    }
65    table.set_titles(TableRow::new(vec![
66        Cell::new(T::description().as_str()).style_spec(format!("H{}", columns_count).as_str())
67    ]));
68    table.printstd();
69}
70
71/// Returns table and index bloat in your database ordered by most wasteful.
72pub async fn bloat(pool: &Pool<Postgres>) -> Result<Vec<Bloat>, PgExtrasError> {
73    get_rows(None, pool).await
74}
75
76/// Lists queries that are blocking other queries.
77pub async fn blocking(
78    limit: Option<String>,
79    pool: &Pool<Postgres>,
80) -> Result<Vec<Blocking>, PgExtrasError> {
81    get_rows(Some(limit_params(limit)), pool).await
82}
83
84/// Creates a new connection pool to PostgreSQL.
85///
86/// Uses either PG_EXTRAS_DATABASE_URL or DATABASE_URL environment variable.
87pub async fn pg_pool() -> Result<Pool<Postgres>, PgExtrasError> {
88    match PgPoolOptions::new()
89        .max_connections(5)
90        .acquire_timeout(Duration::from_secs(10))
91        .connect(db_url()?.as_str())
92        .await
93    {
94        Ok(pool) => Ok(pool),
95        Err(e) => Err(PgExtrasError::DbConnectionError(format!("{}", e))),
96    }
97}
98
99/// Returns statistics about query calls in the database.
100pub async fn calls(
101    limit: Option<String>,
102    pool: &Pool<Postgres>,
103) -> Result<Vec<Calls>, PgExtrasError> {
104    get_rows(Some(limit_params(limit)), pool).await
105}
106
107/// Lists all installed PostgreSQL extensions.
108pub async fn extensions(pool: &Pool<Postgres>) -> Result<Vec<Extensions>, PgExtrasError> {
109    get_rows(None, pool).await
110}
111
112/// Shows cache hit rates for tables.
113pub async fn table_cache_hit(pool: &Pool<Postgres>) -> Result<Vec<TableCacheHit>, PgExtrasError> {
114    get_rows(None, pool).await
115}
116
117/// Lists all tables in the database with their basic information.
118pub async fn tables(
119    schema: Option<String>,
120    pool: &Pool<Postgres>,
121) -> Result<Vec<Tables>, PgExtrasError> {
122    get_rows(Some(schema_params(schema)), pool).await
123}
124
125/// Shows index cache hit rates.
126pub async fn index_cache_hit(
127    schema: Option<String>,
128    pool: &Pool<Postgres>,
129) -> Result<Vec<IndexCacheHit>, PgExtrasError> {
130    get_rows(Some(schema_params(schema)), pool).await
131}
132
133/// Lists all indexes in the database.
134pub async fn indexes(pool: &Pool<Postgres>) -> Result<Vec<Indexes>, PgExtrasError> {
135    get_rows(None, pool).await
136}
137
138/// Shows the size of all indexes, ordered by size.
139pub async fn index_size(pool: &Pool<Postgres>) -> Result<Vec<IndexSize>, PgExtrasError> {
140    get_rows(None, pool).await
141}
142
143/// Shows statistics about index usage.
144pub async fn index_usage(
145    schema: Option<String>,
146    pool: &Pool<Postgres>,
147) -> Result<Vec<IndexUsage>, PgExtrasError> {
148    get_rows(Some(schema_params(schema)), pool).await
149}
150
151/// Shows statistics about index scans.
152pub async fn index_scans(
153    schema: Option<String>,
154    pool: &Pool<Postgres>,
155) -> Result<Vec<IndexScans>, PgExtrasError> {
156    get_rows(Some(schema_params(schema)), pool).await
157}
158
159/// Shows indexes that contain mostly NULL values.
160pub async fn null_indexes(
161    min_relation_size_mb: Option<String>,
162    pool: &Pool<Postgres>,
163) -> Result<Vec<NullIndexes>, PgExtrasError> {
164    let min_relation_size_mb = min_relation_size_mb.unwrap_or("10".to_string());
165
166    let params: HashMap<String, String> = [(
167        "min_relation_size_mb".to_string(),
168        min_relation_size_mb.to_string(),
169    )]
170    .iter()
171    .cloned()
172    .collect();
173    get_rows(Some(params), pool).await
174}
175
176/// Shows information about locks in the database.
177pub async fn locks(pool: &Pool<Postgres>) -> Result<Vec<Locks>, PgExtrasError> {
178    get_rows(None, pool).await
179}
180
181/// Shows detailed information about all locks in the database.
182pub async fn all_locks(pool: &Pool<Postgres>) -> Result<Vec<AllLocks>, PgExtrasError> {
183    get_rows(None, pool).await
184}
185
186/// Lists currently running queries that have been running for a long time.
187pub async fn long_running_queries(
188    pool: &Pool<Postgres>,
189) -> Result<Vec<LongRunningQueries>, PgExtrasError> {
190    get_rows(None, pool).await
191}
192
193/// Generates a Mandelbrot set as a test query.
194pub async fn mandelbrot(pool: &Pool<Postgres>) -> Result<Vec<Mandelbrot>, PgExtrasError> {
195    get_rows(None, pool).await
196}
197
198/// Shows queries with the longest execution time in aggregate.
199pub async fn outliers(pool: &Pool<Postgres>) -> Result<Vec<Outliers>, PgExtrasError> {
200    get_rows(None, pool).await
201}
202
203/// Shows estimated number of rows in each table, ordered by estimated count.
204pub async fn records_rank(
205    schema: Option<String>,
206    pool: &Pool<Postgres>,
207) -> Result<Vec<RecordsRank>, PgExtrasError> {
208    get_rows(Some(schema_params(schema)), pool).await
209}
210
211/// Shows statistics about sequential scans performed on tables.
212pub async fn seq_scans(
213    schema: Option<String>,
214    pool: &Pool<Postgres>,
215) -> Result<Vec<SeqScans>, PgExtrasError> {
216    get_rows(Some(schema_params(schema)), pool).await
217}
218
219/// Shows statistics about index scans performed on tables.
220pub async fn table_index_scans(
221    schema: Option<String>,
222    pool: &Pool<Postgres>,
223) -> Result<Vec<TableIndexScans>, PgExtrasError> {
224    get_rows(Some(schema_params(schema)), pool).await
225}
226
227/// Shows total size of all indexes for each table.
228pub async fn table_indexes_size(
229    schema: Option<String>,
230    pool: &Pool<Postgres>,
231) -> Result<Vec<TableIndexesSize>, PgExtrasError> {
232    get_rows(Some(schema_params(schema)), pool).await
233}
234
235/// Shows disk space used by each table, excluding indexes.
236pub async fn table_size(pool: &Pool<Postgres>) -> Result<Vec<TableSize>, PgExtrasError> {
237    get_rows(None, pool).await
238}
239
240/// Shows total size of all indexes in the database.
241pub async fn total_index_size(pool: &Pool<Postgres>) -> Result<Vec<TotalIndexSize>, PgExtrasError> {
242    get_rows(None, pool).await
243}
244
245/// Shows total disk space used by tables and indexes.
246pub async fn total_table_size(pool: &Pool<Postgres>) -> Result<Vec<TotalTableSize>, PgExtrasError> {
247    get_rows(None, pool).await
248}
249
250/// Lists indexes that haven't been used or are rarely used.
251pub async fn unused_indexes(
252    schema: Option<String>,
253    pool: &Pool<Postgres>,
254) -> Result<Vec<UnusedIndexes>, PgExtrasError> {
255    get_rows(Some(schema_params(schema)), pool).await
256}
257
258/// Shows indexes that have identical definitions but different names.
259pub async fn duplicate_indexes(
260    pool: &Pool<Postgres>,
261) -> Result<Vec<DuplicateIndexes>, PgExtrasError> {
262    get_rows(None, pool).await
263}
264
265/// Shows statistics about VACUUM and ANALYZE operations.
266pub async fn vacuum_stats(pool: &Pool<Postgres>) -> Result<Vec<VacuumStats>, PgExtrasError> {
267    get_rows(None, pool).await
268}
269
270/// Shows statistics about shared buffer cache usage.
271pub async fn buffercache_stats(
272    pool: &Pool<Postgres>,
273) -> Result<Vec<BuffercacheStats>, PgExtrasError> {
274    get_rows(None, pool).await
275}
276
277/// Shows distribution of buffer cache usage by database objects.
278pub async fn buffercache_usage(
279    pool: &Pool<Postgres>,
280) -> Result<Vec<BuffercacheUsage>, PgExtrasError> {
281    get_rows(None, pool).await
282}
283
284/// Shows whether SSL is being used for current connections.
285pub async fn ssl_used(pool: &Pool<Postgres>) -> Result<Vec<SslUsed>, PgExtrasError> {
286    get_rows(None, pool).await
287}
288
289/// Shows information about current database connections and their states.
290pub async fn connections(pool: &Pool<Postgres>) -> Result<Vec<Connections>, PgExtrasError> {
291    get_rows(None, pool).await
292}
293
294/// Shows cache hit rates for both tables and indexes.
295pub async fn cache_hit(
296    schema: Option<String>,
297    pool: &Pool<Postgres>,
298) -> Result<Vec<CacheHit>, PgExtrasError> {
299    get_rows(Some(schema_params(schema)), pool).await
300}
301
302/// Shows current values of important PostgreSQL settings.
303pub async fn db_settings(pool: &Pool<Postgres>) -> Result<Vec<DbSettings>, PgExtrasError> {
304    get_rows(None, pool).await
305}
306
307/// Runs a comprehensive set of diagnostic checks on the database.
308pub async fn diagnose(pool: &Pool<Postgres>) -> Result<Vec<CheckResult>, PgExtrasError> {
309    run_diagnose(pool).await
310}
311
312#[derive(Debug, Clone)]
313#[non_exhaustive]
314pub enum PgExtrasError {
315    MissingConfigVars(),
316    DbConnectionError(String),
317    Other(String),
318}
319
320impl fmt::Display for PgExtrasError {
321    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
322        let msg = match self {
323            Self::MissingConfigVars() => {
324                "Both $DATABASE_URL and $PG_EXTRAS_DATABASE_URL are not set."
325            }
326            Self::DbConnectionError(e) => &format!("Cannot connect to database: '{}'", e),
327            Self::Other(e) => &e.to_string(),
328        };
329
330        write!(f, "{}", msg)
331    }
332}
333
334impl std::error::Error for PgExtrasError {}
335
336use crate::diagnose::run::{run_diagnose, CheckResult};
337use lazy_static::lazy_static;
338
339lazy_static! {
340    pub static ref NEW_PG_STAT_STATEMENTS: Version = Version::parse("1.8.0").unwrap();
341    pub static ref PG_STAT_STATEMENTS_17: Version = semver::Version::parse("1.11.0").unwrap();
342}
343
344#[derive(Debug)]
345pub enum PgStatsVersion {
346    Legacy,
347    Standard,
348    Pg17,
349}
350
351async fn get_rows<T: Query>(
352    params: Option<HashMap<String, String>>,
353    pool: &Pool<Postgres>,
354) -> Result<Vec<T>, PgExtrasError> {
355    let pg_statements_query =
356        "select installed_version from pg_available_extensions where name='pg_stat_statements'";
357
358    let pg_statements_version = match sqlx::query(pg_statements_query).fetch_one(pool).await {
359        Ok(row) => row
360            .try_get::<String, _>("installed_version")
361            .unwrap_or_default(),
362        Err(_) => "".to_string(),
363    };
364
365    let default_version = NEW_PG_STAT_STATEMENTS.clone();
366    let pg_statements_version = format!("{}.0", pg_statements_version);
367    let pg_statements_version =
368        Version::parse(&pg_statements_version).unwrap_or(default_version.clone());
369
370    let pg_statements_version = if pg_statements_version < default_version {
371        PgStatsVersion::Legacy
372    } else if pg_statements_version >= *PG_STAT_STATEMENTS_17 {
373        PgStatsVersion::Pg17
374    } else {
375        PgStatsVersion::Standard
376    };
377
378    let mut query = T::read_file(Some(pg_statements_version));
379
380    if let Some(params) = params {
381        for (key, value) in &params {
382            query = query.replace(&format!("%{{{}}}", key), value.as_str());
383        }
384    }
385
386    Ok(match sqlx::query(&query).fetch_all(pool).await {
387        Ok(rows) => rows.iter().map(T::new).collect(),
388        Err(e) => return Err(PgExtrasError::Other(format!("{}", e))),
389    })
390}
391
392fn db_url() -> Result<String, PgExtrasError> {
393    env::var("PG_EXTRAS_DATABASE_URL")
394        .or_else(|_| env::var("DATABASE_URL"))
395        .map_err(|_| PgExtrasError::MissingConfigVars())
396}
397
398fn schema_params(schema_name: Option<String>) -> HashMap<String, String> {
399    let schema_name = schema_name.unwrap_or(get_default_schema());
400    [("schema".to_string(), schema_name.to_string())]
401        .iter()
402        .cloned()
403        .collect()
404}
405
406fn limit_params(limit: Option<String>) -> HashMap<String, String> {
407    let limit = limit.unwrap_or("10".to_string());
408    [("limit".to_string(), limit.to_string())]
409        .iter()
410        .cloned()
411        .collect()
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use crate::diagnose::report::render_diagnose_report;
418
419    async fn setup() -> Result<(), Box<dyn std::error::Error>> {
420        let port = match env::var("PG_VERSION").expect("PG_VERSION not set").as_str() {
421            "12" => "5432",
422            "13" => "5433",
423            "14" => "5434",
424            "15" => "5435",
425            "16" => "5436",
426            "17" => "5437",
427            _ => "5432",
428        };
429
430        env::set_var(
431            "PG_EXTRAS_DATABASE_URL",
432            format!(
433                "postgres://postgres:secret@localhost:{}/pg-extras-rs-test",
434                port
435            ),
436        );
437
438        let pool = PgPoolOptions::new()
439            .max_connections(5)
440            .connect(db_url()?.as_str())
441            .await?;
442
443        for extension in ["sslinfo", "pg_stat_statements", "pg_buffercache"] {
444            let query = format!("CREATE EXTENSION IF NOT EXISTS {};", extension);
445            sqlx::query(&query).execute(&pool).await?;
446        }
447
448        Ok(())
449    }
450
451    #[tokio::test]
452    async fn it_works() -> Result<(), Box<dyn std::error::Error>> {
453        setup().await?;
454
455        let pool = pg_pool().await?;
456
457        render_table(cache_hit(None, &pool).await?);
458        render_table(bloat(&pool).await?);
459        render_table(blocking(None, &pool).await?);
460        render_table(calls(None, &pool).await?);
461        render_table(extensions(&pool).await?);
462        render_table(table_cache_hit(&pool).await?);
463        render_table(seq_scans(None, &pool).await?);
464        render_table(table_index_scans(None, &pool).await?);
465        render_table(table_indexes_size(None, &pool).await?);
466        render_table(tables(None, &pool).await?);
467        render_table(index_cache_hit(None, &pool).await?);
468        render_table(indexes(&pool).await?);
469        render_table(index_size(&pool).await?);
470        render_table(index_usage(None, &pool).await?);
471        render_table(index_scans(None, &pool).await?);
472        render_table(null_indexes(None, &pool).await?);
473        render_table(locks(&pool).await?);
474        render_table(all_locks(&pool).await?);
475        render_table(long_running_queries(&pool).await?);
476        render_table(mandelbrot(&pool).await?);
477        render_table(outliers(&pool).await?);
478        render_table(records_rank(None, &pool).await?);
479        render_table(table_size(&pool).await?);
480        render_table(total_index_size(&pool).await?);
481        render_table(total_table_size(&pool).await?);
482        render_table(unused_indexes(None, &pool).await?);
483        render_table(duplicate_indexes(&pool).await?);
484        render_table(vacuum_stats(&pool).await?);
485        render_table(buffercache_stats(&pool).await?);
486        render_table(buffercache_usage(&pool).await?);
487        render_table(ssl_used(&pool).await?);
488        render_table(connections(&pool).await?);
489        render_table(db_settings(&pool).await?);
490        render_diagnose_report(diagnose(&pool).await?);
491
492        Ok(())
493    }
494
495    #[test]
496    fn normal_types() {
497        fn is_normal<T: Sized + Send + Sync + Unpin>() {}
498
499        is_normal::<NullIndexes>();
500        is_normal::<Bloat>();
501        is_normal::<Blocking>();
502        is_normal::<Calls>();
503        is_normal::<Extensions>();
504        is_normal::<TableCacheHit>();
505        is_normal::<Tables>();
506        is_normal::<IndexCacheHit>();
507        is_normal::<Indexes>();
508        is_normal::<IndexSize>();
509        is_normal::<IndexUsage>();
510        is_normal::<IndexScans>();
511        is_normal::<NullIndexes>();
512        is_normal::<Locks>();
513        is_normal::<AllLocks>();
514        is_normal::<LongRunningQueries>();
515        is_normal::<Mandelbrot>();
516        is_normal::<Outliers>();
517        is_normal::<RecordsRank>();
518        is_normal::<SeqScans>();
519        is_normal::<TableIndexScans>();
520        is_normal::<TableIndexesSize>();
521        is_normal::<TableSize>();
522        is_normal::<TotalIndexSize>();
523        is_normal::<TotalTableSize>();
524        is_normal::<UnusedIndexes>();
525        is_normal::<DuplicateIndexes>();
526        is_normal::<VacuumStats>();
527        is_normal::<DuplicateIndexes>();
528        is_normal::<BuffercacheStats>();
529        is_normal::<SslUsed>();
530        is_normal::<Connections>();
531        is_normal::<CacheHit>();
532        is_normal::<DbSettings>();
533        is_normal::<PgExtrasError>();
534    }
535}