athena_rs 3.3.0

Database gateway API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
//! Collect vacuum / XID health from PostgreSQL catalogs and persist snapshots into the logging DB.

use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde::Serialize;
use sqlx::postgres::PgPool;
use sqlx::{Error, Postgres, Transaction};
use uuid::Uuid;

/// Default: treat tables with dead tuple share at or above this as bloated.
pub const DEFAULT_BLOAT_DEAD_PCT: f64 = 10.0;
/// Default: XID age at or above this fraction of `autovacuum_freeze_max_age` counts as freeze risk.
pub const DEFAULT_XID_RISK_PCT: f64 = 50.0;
/// Default `SET LOCAL statement_timeout` for catalog queries (ms).
pub const DEFAULT_STATEMENT_TIMEOUT_MS: u64 = 60_000;

/// Thresholds for rollup counts (bloat, XID risk, “needs vacuum”).
#[derive(Debug, Clone, Copy)]
pub struct VacuumHealthThresholds {
    pub bloat_dead_pct: f64,
    pub xid_risk_pct: f64,
}

impl Default for VacuumHealthThresholds {
    fn default() -> Self {
        Self {
            bloat_dead_pct: DEFAULT_BLOAT_DEAD_PCT,
            xid_risk_pct: DEFAULT_XID_RISK_PCT,
        }
    }
}

impl VacuumHealthThresholds {
    pub fn from_env() -> Self {
        let bloat_dead_pct: f64 = std::env::var("ATHENA_VACUUM_HEALTH_BLOAT_DEAD_PCT")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(DEFAULT_BLOAT_DEAD_PCT);
        let xid_risk_pct: f64 = std::env::var("ATHENA_VACUUM_HEALTH_XID_RISK_PCT")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(DEFAULT_XID_RISK_PCT);
        Self {
            bloat_dead_pct,
            xid_risk_pct,
        }
    }
}

/// One row from `pg_stat_user_tables` + XID age (per target database).
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct VacuumCatalogRow {
    pub schemaname: String,
    pub relname: String,
    pub n_dead_tup: i64,
    pub n_live_tup: i64,
    pub dead_pct: f64,
    pub last_vacuum: Option<DateTime<Utc>>,
    pub last_autovacuum: Option<DateTime<Utc>>,
    pub xid_age: i64,
    /// `NULL` when `freeze_max_age` is zero (avoid division by zero).
    pub xid_age_pct: Option<f64>,
    pub freeze_max_age: i64,
}

#[derive(Debug, Clone, Copy)]
pub struct VacuumRollup {
    pub total_dead_rows: i64,
    pub tables_with_bloat: i32,
    pub xid_freeze_risk: i32,
    pub tables_needing_vacuum: i32,
    pub freeze_max_age: Option<i64>,
}

/// Run the catalog query on the **target** client pool (read-only stats).
pub async fn collect_vacuum_catalog_rows(
    target_pool: &PgPool,
    statement_timeout_ms: u64,
) -> Result<Vec<VacuumCatalogRow>, Error> {
    let ms: i64 = i64::try_from(statement_timeout_ms).unwrap_or(i64::MAX);
    let mut tx: Transaction<'_, Postgres> = target_pool.begin().await?;
    sqlx::query(&format!("SET LOCAL statement_timeout = {ms}"))
        .execute(&mut *tx)
        .await?;

    let rows: Vec<VacuumCatalogRow> = sqlx::query_as(
        r#"
        WITH freeze_cfg AS (
            SELECT COALESCE(setting::bigint, 200000000) AS freeze_max_age
            FROM pg_settings
            WHERE name = 'autovacuum_freeze_max_age'
            LIMIT 1
        )
        SELECT
            s.schemaname AS schemaname,
            s.relname AS relname,
            s.n_dead_tup::bigint AS n_dead_tup,
            s.n_live_tup::bigint AS n_live_tup,
            CASE
                WHEN (s.n_live_tup + s.n_dead_tup) > 0 THEN
                    (100.0 * s.n_dead_tup::float8 / (s.n_live_tup + s.n_dead_tup)::float8)
                ELSE 0.0
            END AS dead_pct,
            s.last_vacuum AS last_vacuum,
            s.last_autovacuum AS last_autovacuum,
            age(c.relfrozenxid)::bigint AS xid_age,
            CASE
                WHEN f.freeze_max_age > 0 THEN
                    (100.0 * age(c.relfrozenxid)::float8 / f.freeze_max_age::float8)
                ELSE NULL
            END AS xid_age_pct,
            f.freeze_max_age AS freeze_max_age
        FROM pg_stat_user_tables s
        INNER JOIN pg_class c ON c.oid = s.relid
        CROSS JOIN freeze_cfg f
        ORDER BY s.schemaname, s.relname
        "#,
    )
    .fetch_all(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(rows)
}

pub fn rollup_vacuum_rows(
    rows: &[VacuumCatalogRow],
    thresholds: &VacuumHealthThresholds,
) -> VacuumRollup {
    let total_dead_rows: i64 = rows.iter().map(|r| r.n_dead_tup).sum();
    let freeze_max_age: Option<i64> = rows.first().map(|r| r.freeze_max_age);

    let mut tables_with_bloat: i32 = 0;
    let mut xid_freeze_risk: i32 = 0;
    let mut tables_needing_vacuum: i32 = 0;

    for r in rows {
        let bloat: bool = r.dead_pct >= thresholds.bloat_dead_pct;
        let xid_risk: bool = r
            .xid_age_pct
            .map(|p| p >= thresholds.xid_risk_pct)
            .unwrap_or(false);
        if bloat {
            tables_with_bloat += 1;
        }
        if xid_risk {
            xid_freeze_risk += 1;
        }
        if bloat || xid_risk {
            tables_needing_vacuum += 1;
        }
    }

    VacuumRollup {
        total_dead_rows,
        tables_with_bloat,
        xid_freeze_risk,
        tables_needing_vacuum,
        freeze_max_age,
    }
}

/// Persist a successful snapshot and per-table rows into the logging database.
pub async fn insert_vacuum_health_snapshot(
    logging_pool: &PgPool,
    client_name: &str,
    host: Option<&str>,
    instance_id: Option<Uuid>,
    rollup: &VacuumRollup,
    rows: &[VacuumCatalogRow],
) -> Result<i64, Error> {
    let mut tx: Transaction<'_, Postgres> = logging_pool.begin().await?;

    let snapshot_id: i64 = sqlx::query_scalar(
        r#"
        INSERT INTO vacuum_health_snapshots (
            client_name,
            host,
            instance_id,
            total_dead_rows,
            tables_with_bloat,
            xid_freeze_risk,
            tables_needing_vacuum,
            freeze_max_age,
            collection_error
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NULL)
        RETURNING id
        "#,
    )
    .bind(client_name)
    .bind(host)
    .bind(instance_id)
    .bind(rollup.total_dead_rows)
    .bind(rollup.tables_with_bloat)
    .bind(rollup.xid_freeze_risk)
    .bind(rollup.tables_needing_vacuum)
    .bind(rollup.freeze_max_age)
    .fetch_one(&mut *tx)
    .await?;

    for r in rows {
        sqlx::query(
            r#"
            INSERT INTO vacuum_health_table_stats (
                snapshot_id,
                schemaname,
                relname,
                n_dead_tup,
                n_live_tup,
                dead_pct,
                last_vacuum,
                last_autovacuum,
                xid_age,
                xid_age_pct_of_freeze_max
            )
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
            "#,
        )
        .bind(snapshot_id)
        .bind(&r.schemaname)
        .bind(&r.relname)
        .bind(r.n_dead_tup)
        .bind(r.n_live_tup)
        .bind(r.dead_pct)
        .bind(r.last_vacuum)
        .bind(r.last_autovacuum)
        .bind(r.xid_age)
        .bind(r.xid_age_pct)
        .execute(&mut *tx)
        .await?;
    }

    tx.commit().await?;
    Ok(snapshot_id)
}

/// Record a failed collection attempt (no table rows).
pub async fn insert_vacuum_health_failure(
    logging_pool: &PgPool,
    client_name: &str,
    host: Option<&str>,
    instance_id: Option<Uuid>,
    collection_error: &str,
) -> Result<i64, sqlx::Error> {
    let id: i64 = sqlx::query_scalar(
        r#"
        INSERT INTO vacuum_health_snapshots (
            client_name,
            host,
            instance_id,
            total_dead_rows,
            tables_with_bloat,
            xid_freeze_risk,
            tables_needing_vacuum,
            freeze_max_age,
            collection_error
        )
        VALUES ($1, $2, $3, 0, 0, 0, 0, NULL, $4)
        RETURNING id
        "#,
    )
    .bind(client_name)
    .bind(host)
    .bind(instance_id)
    .bind(collection_error)
    .fetch_one(logging_pool)
    .await?;
    Ok(id)
}

pub async fn prune_vacuum_health_snapshots(
    logging_pool: &PgPool,
    retention_days: i64,
) -> Result<u64, sqlx::Error> {
    let cutoff: DateTime<Utc> = Utc::now() - chrono::Duration::days(retention_days);
    let affected = sqlx::query("DELETE FROM vacuum_health_snapshots WHERE recorded_at < $1")
        .bind(cutoff)
        .execute(logging_pool)
        .await?
        .rows_affected();
    Ok(affected)
}

#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct VacuumHealthSnapshotRecord {
    pub id: i64,
    pub recorded_at: DateTime<Utc>,
    pub client_name: String,
    pub host: Option<String>,
    pub instance_id: Option<Uuid>,
    pub total_dead_rows: i64,
    pub tables_with_bloat: i32,
    pub xid_freeze_risk: i32,
    pub tables_needing_vacuum: i32,
    pub freeze_max_age: Option<i64>,
    pub collection_error: Option<String>,
}

#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct VacuumHealthTableStatRecord {
    pub id: i64,
    pub snapshot_id: i64,
    pub schemaname: String,
    pub relname: String,
    pub n_dead_tup: i64,
    pub n_live_tup: i64,
    /// Stored as `NUMERIC` in `vacuum_health_table_stats`; decode as `Decimal`, serialize as JSON float.
    #[serde(with = "rust_decimal::serde::float_option")]
    pub dead_pct: Option<Decimal>,
    pub last_vacuum: Option<DateTime<Utc>>,
    pub last_autovacuum: Option<DateTime<Utc>>,
    pub xid_age: Option<i64>,
    #[serde(with = "rust_decimal::serde::float_option")]
    pub xid_age_pct_of_freeze_max: Option<Decimal>,
}

/// Latest snapshot per target `client_name` (distinct on).
pub async fn list_latest_vacuum_health_summaries(
    logging_pool: &PgPool,
) -> Result<Vec<VacuumHealthSnapshotRecord>, sqlx::Error> {
    sqlx::query_as(
        r#"
        SELECT DISTINCT ON (client_name)
            id,
            recorded_at,
            client_name,
            host,
            instance_id,
            total_dead_rows,
            tables_with_bloat,
            xid_freeze_risk,
            tables_needing_vacuum,
            freeze_max_age,
            collection_error
        FROM vacuum_health_snapshots
        ORDER BY client_name, recorded_at DESC
        "#,
    )
    .fetch_all(logging_pool)
    .await
}

pub async fn get_latest_vacuum_health_detail(
    logging_pool: &PgPool,
    client_name: &str,
) -> Result<Option<(VacuumHealthSnapshotRecord, Vec<VacuumHealthTableStatRecord>)>, sqlx::Error> {
    let snapshot: Option<VacuumHealthSnapshotRecord> = sqlx::query_as(
        r#"
        SELECT
            id,
            recorded_at,
            client_name,
            host,
            instance_id,
            total_dead_rows,
            tables_with_bloat,
            xid_freeze_risk,
            tables_needing_vacuum,
            freeze_max_age,
            collection_error
        FROM vacuum_health_snapshots
        WHERE client_name = $1
        ORDER BY recorded_at DESC
        LIMIT 1
        "#,
    )
    .bind(client_name)
    .fetch_optional(logging_pool)
    .await?;

    let Some(snap) = snapshot else {
        return Ok(None);
    };

    let tables: Vec<VacuumHealthTableStatRecord> = sqlx::query_as(
        r#"
        SELECT
            id,
            snapshot_id,
            schemaname,
            relname,
            n_dead_tup,
            n_live_tup,
            dead_pct,
            last_vacuum,
            last_autovacuum,
            xid_age,
            xid_age_pct_of_freeze_max
        FROM vacuum_health_table_stats
        WHERE snapshot_id = $1
        ORDER BY vacuum_health_table_stats.dead_pct DESC NULLS LAST, schemaname, relname
        "#,
    )
    .bind(snap.id)
    .fetch_all(logging_pool)
    .await?;

    Ok(Some((snap, tables)))
}

#[cfg(test)]
mod tests {
    use super::*;

    fn row(dead_pct: f64, xid_pct: Option<f64>) -> VacuumCatalogRow {
        VacuumCatalogRow {
            schemaname: "public".to_string(),
            relname: "t".to_string(),
            n_dead_tup: 0,
            n_live_tup: 100,
            dead_pct,
            last_vacuum: None,
            last_autovacuum: None,
            xid_age: 0,
            xid_age_pct: xid_pct,
            freeze_max_age: 200_000_000,
        }
    }

    #[test]
    fn rollup_counts_bloat_and_xid_and_union() {
        let thresholds: VacuumHealthThresholds = VacuumHealthThresholds {
            bloat_dead_pct: 10.0,
            xid_risk_pct: 50.0,
        };
        let rows: Vec<VacuumCatalogRow> =
            vec![row(11.0, None), row(0.0, Some(60.0)), row(5.0, Some(40.0))];
        let r: VacuumRollup = rollup_vacuum_rows(&rows, &thresholds);
        assert_eq!(r.tables_with_bloat, 1);
        assert_eq!(r.xid_freeze_risk, 1);
        assert_eq!(r.tables_needing_vacuum, 2);
    }

    /// Guard against reintroducing the `freeze` CTE name, which is a reserved
    /// keyword in some Postgres-compatible backends (e.g. Neon) and causes a
    /// syntax error at runtime.
    #[test]
    fn vacuum_query_does_not_use_reserved_freeze_cte_name() {
        // Build the forbidden pattern dynamically so the literal string is never
        // present in this source file and can't accidentally trigger the check.
        let bad_cte: String = ["WITH ", "freeze", " AS"].concat();
        let src = include_str!(concat!(
            env!("CARGO_MANIFEST_DIR"),
            "/src/data/vacuum_health.rs"
        ));
        let hits: Vec<_> = src.match_indices(&bad_cte[..]).collect();
        assert!(
            hits.is_empty(),
            "CTE must not be named 'freeze' — reserved keyword in some backends. \
             Found at: {:?}",
            hits
        );
        assert!(
            src.contains("WITH freeze_cfg"),
            "CTE should be named 'freeze_cfg'"
        );
    }

    #[test]
    fn rollup_all_clean_rows_gives_zero_counters() {
        let thresholds = VacuumHealthThresholds {
            bloat_dead_pct: 10.0,
            xid_risk_pct: 50.0,
        };
        let rows = vec![row(0.0, Some(5.0)), row(1.0, Some(10.0))];
        let r = rollup_vacuum_rows(&rows, &thresholds);
        assert_eq!(r.tables_with_bloat, 0);
        assert_eq!(r.xid_freeze_risk, 0);
        assert_eq!(r.tables_needing_vacuum, 0);
    }

    #[test]
    fn rollup_empty_rows_is_zero() {
        let thresholds = VacuumHealthThresholds {
            bloat_dead_pct: 10.0,
            xid_risk_pct: 50.0,
        };
        let r = rollup_vacuum_rows(&[], &thresholds);
        assert_eq!(r.total_dead_rows, 0);
        assert_eq!(r.tables_with_bloat, 0);
        assert_eq!(r.xid_freeze_risk, 0);
        assert_eq!(r.tables_needing_vacuum, 0);
        assert!(r.freeze_max_age.is_none());
    }
}