Skip to main content

rivet/source/postgres/
mod.rs

1//! PostgreSQL `Source` implementation.
2//!
3//! Module layout:
4//!
5//! - `mod.rs` (this file) — `PostgresSource` struct + connect/TLS path, the
6//!   transaction-pooler detector, `PgTxnGuard`, sampling helpers
7//!   (`sample_temp_bytes`, `pg_sample_checkpoints_req`, `pg_fetch_work_mem_bytes`),
8//!   `introspect_pg_table_for_chunking`, the cursor + FETCH export loop
9//!   (`pg_run_export`), the `Source` trait impl, and the catalog-hint
10//!   resolver that bridges parsed FROM clauses to `pg_catalog`.
11//! - [`arrow_convert`] — the entire row → Arrow `RecordBatch` pipeline: type
12//!   mapping (`pg_columns_to_schema`, `rivet_type_for_pg_column`), per-cell
13//!   decoders (INTERVAL, UUID, enum, NUMERIC), and the array builders. Kept
14//!   in a sibling because it is the largest single-purpose cluster in this
15//!   driver (~620 LoC) and has zero reverse dependency back into the
16//!   connection / cursor layer.
17//! - [`from_parse`] — pure `&str`/`&[u8]` parser that extracts the simple
18//!   `<schema>.<table>` literal from a user query so the catalog-hint path
19//!   can cast it to `regclass`.  Zero postgres-crate dependency, fully
20//!   unit-tested in isolation.
21
22mod arrow_convert;
23mod from_parse;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27
28use arrow::datatypes::{Schema, SchemaRef};
29use postgres::types::Type;
30use postgres::{Client, NoTls};
31
32use crate::config::{SourceType, TlsConfig};
33use crate::error::Result;
34use crate::source::query::build_export_query;
35use crate::source::tls::build_native_tls;
36use crate::tuning::{ADAPTIVE_SAMPLE_INTERVAL, SourceTuning, next_adaptive_batch_size};
37use crate::types::{ColumnOverrides, SourceColumn, TypeMapping};
38
39use arrow_convert::{pg_columns_to_schema, rivet_type_for_pg_column, rows_to_record_batch_typed};
40use from_parse::try_parse_pg_simple_from_regclass_literal;
41
42pub struct PostgresSource {
43    client: Client,
44    /// True when two consecutive pg_backend_pid() calls returned different values,
45    /// indicating a transaction-mode connection pooler (pgBouncer, Odyssey, etc.).
46    transaction_pooler: bool,
47}
48
49/// Detect whether the connection is going through a transaction-mode pooler
50/// (pgBouncer, Odyssey, etc.) by comparing backend PIDs across two implicit
51/// transactions. Returns true when PIDs differ — impossible on a direct
52/// connection or session-mode pooler where the same physical backend is kept.
53///
54/// False negatives are possible when pool_size = 1 (the same backend is always
55/// reused), so this is a best-effort warning rather than a hard guarantee.
56fn detect_pg_transaction_pooler(client: &mut Client) -> bool {
57    let pid1: Option<i32> = client
58        .query_one("SELECT pg_backend_pid()", &[])
59        .ok()
60        .and_then(|r| r.try_get(0).ok());
61    let pid2: Option<i32> = client
62        .query_one("SELECT pg_backend_pid()", &[])
63        .ok()
64        .and_then(|r| r.try_get(0).ok());
65    matches!((pid1, pid2), (Some(a), Some(b)) if a != b)
66}
67
68impl PostgresSource {
69    /// Connect with no transport security (legacy path). Prefer [`Self::connect_with_tls`]
70    /// for production workloads so credentials and result sets are not visible on the wire.
71    pub fn connect(url: &str) -> Result<Self> {
72        let mut client = Client::connect(url, NoTls)?;
73        let transaction_pooler = detect_pg_transaction_pooler(&mut client);
74        if transaction_pooler {
75            log::warn!(
76                "transaction-mode connection pooler detected (pgBouncer/Odyssey) — \
77                 SET LOCAL tuning is transaction-scoped; \
78                 LISTEN/NOTIFY and advisory locks are unavailable"
79            );
80        }
81        Ok(Self {
82            client,
83            transaction_pooler,
84        })
85    }
86
87    /// Connect honoring the user's [`TlsConfig`]. When `tls.mode` is
88    /// [`TlsMode::Disable`] this falls back to [`Self::connect`].
89    pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
90        match tls {
91            Some(cfg) if cfg.mode.is_enforced() => {
92                let connector = build_native_tls(cfg)?;
93                let make_tls = postgres_native_tls::MakeTlsConnector::new(connector);
94                let mut client = Client::connect(url, make_tls)?;
95                let transaction_pooler = detect_pg_transaction_pooler(&mut client);
96                if transaction_pooler {
97                    log::warn!(
98                        "transaction-mode connection pooler detected (pgBouncer/Odyssey) — \
99                         SET LOCAL tuning is transaction-scoped; \
100                         LISTEN/NOTIFY and advisory locks are unavailable"
101                    );
102                }
103                Ok(Self {
104                    client,
105                    transaction_pooler,
106                })
107            }
108            _ => Self::connect(url),
109        }
110    }
111}
112
113/// RAII guard for an open `BEGIN ... COMMIT` block.
114///
115/// `commit()` runs `COMMIT` and marks the txn done; if the guard is dropped
116/// before `commit()` (early return, `?`-bubbled error, or panic-driven unwind),
117/// `Drop` issues a best-effort `ROLLBACK`. Postgres releases any open cursors
118/// as part of ROLLBACK, so the cursor declared inside the txn is also cleaned
119/// up. Closes the **G1** gap from the DBA audit (cursor leak on panic).
120struct PgTxnGuard<'a> {
121    client: &'a mut Client,
122    committed: bool,
123}
124
125impl<'a> PgTxnGuard<'a> {
126    fn begin(client: &'a mut Client) -> Result<Self> {
127        client.batch_execute("BEGIN")?;
128        Ok(Self {
129            client,
130            committed: false,
131        })
132    }
133
134    fn client_mut(&mut self) -> &mut Client {
135        self.client
136    }
137
138    fn commit(mut self) -> Result<()> {
139        self.client.batch_execute("COMMIT")?;
140        self.committed = true;
141        Ok(())
142    }
143}
144
145impl Drop for PgTxnGuard<'_> {
146    fn drop(&mut self) {
147        if !self.committed
148            && let Err(e) = self.client.batch_execute("ROLLBACK")
149        {
150            // Drop must not panic. Worst case the connection is poisoned and
151            // the pool recycles it; log so operators see it.
152            log::warn!("PgTxnGuard: ROLLBACK during drop failed: {e:#}");
153        }
154    }
155}
156
157/// Snapshot `pg_stat_database.temp_bytes` for the current database.
158///
159/// Used by the pipeline job to compute per-run cursor / sort spill: we capture
160/// the cluster-wide counter immediately before and after each export and
161/// surface the delta on the run summary card. Failures (connect, query) return
162/// `None` — the metric is informational, not a correctness signal.
163///
164/// Note this is a cluster-level counter: concurrent activity from other
165/// connections during the run inflates the delta. For a single-tenant test
166/// box (the common pilot setup) it is accurate; for shared hosts it is a
167/// noisy upper bound, useful as a "your workload was loud" signal.
168pub(crate) fn sample_temp_bytes(url: &str, tls: Option<&TlsConfig>) -> Option<i64> {
169    let mut client = connect_client(url, tls).ok()?;
170    client
171        .query_one(
172            "SELECT temp_bytes::bigint FROM pg_stat_database WHERE datname = current_database()",
173            &[],
174        )
175        .ok()
176        .and_then(|r| r.try_get::<_, i64>(0).ok())
177}
178
179/// Probe `SHOW work_mem` and return the value in bytes.
180///
181/// PostgreSQL spills FETCH-cursor output to `pgsql_tmp/` once the in-flight
182/// row set exceeds `work_mem` — on wide rows with the default 4 MB the spill
183/// fires on every chunk and dominates `pg_stat_database.temp_bytes`. Knowing
184/// the value lets the cursor loop cap FETCH N below `work_mem × 0.7`, keeping
185/// the result set in memory.
186///
187/// Returns None on any parse / query failure — the cursor loop falls back to
188/// the configured static batch_size in that case.
189fn pg_fetch_work_mem_bytes(client: &mut Client) -> Option<i64> {
190    let raw: Option<String> = client
191        .query_one("SHOW work_mem", &[])
192        .ok()
193        .and_then(|r| r.try_get::<_, String>(0).ok());
194    raw.as_deref().and_then(parse_work_mem)
195}
196
197/// Parse a `SHOW work_mem` value like `"4MB"`, `"16384kB"`, `"1GB"`, or a bare
198/// number-of-kB string (the older PG default unit) into a byte count. Returns
199/// `None` for anything else so callers can decide whether to fall back.
200fn parse_work_mem(raw: &str) -> Option<i64> {
201    let s = raw.trim();
202    // Split numeric prefix from optional unit.
203    let mut split = 0;
204    for (i, ch) in s.char_indices() {
205        if !ch.is_ascii_digit() && ch != '.' && ch != '-' {
206            split = i;
207            break;
208        }
209        split = i + ch.len_utf8();
210    }
211    if split == 0 {
212        return None;
213    }
214    let (num_str, unit) = s.split_at(split);
215    let num: f64 = num_str.parse().ok()?;
216    let unit = unit.trim().to_ascii_lowercase();
217    let multiplier: f64 = match unit.as_str() {
218        // Postgres always uses 1024-based units, matching the syntax it
219        // accepts in postgresql.conf.
220        "" | "kb" => 1024.0,
221        "mb" => 1024.0 * 1024.0,
222        "gb" => 1024.0 * 1024.0 * 1024.0,
223        "tb" => 1024.0 * 1024.0 * 1024.0 * 1024.0,
224        _ => return None,
225    };
226    let bytes = (num * multiplier) as i64;
227    (bytes > 0).then_some(bytes)
228}
229
230/// Sample `checkpoints_req` from `pg_stat_bgwriter`.
231///
232/// PostgreSQL caches the statistics snapshot at the start of each transaction.
233/// We call `pg_stat_clear_snapshot()` first to discard that cache so every
234/// adaptive sample sees fresh counters rather than the frozen value from BEGIN.
235fn pg_sample_checkpoints_req(client: &mut Client) -> Option<i64> {
236    let _ = client.execute("SELECT pg_stat_clear_snapshot()", &[]);
237    client
238        .query_one("SELECT checkpoints_req FROM pg_stat_bgwriter", &[])
239        .ok()
240        .and_then(|r| r.try_get::<_, i64>(0).ok())
241}
242
243/// Probe `pg_class` and `pg_index` for the stats chunked-mode planning needs.
244///
245/// Returns a [`crate::source::TableIntrospection`] populated from one connection
246/// (two round-trips total: one stats query, one PK query). Failure to connect
247/// or to query bubbles up as `Err`; missing rows or unanalyzed tables are
248/// represented as zero/None in the result so callers can decide policy.
249///
250/// The `qualified_table` argument is `<schema>.<table>` (e.g. `public.users`)
251/// or bare `<table>` (resolved under `public`). It is split internally with
252/// the same strict rules as the `table:` YAML shortcut — anything more
253/// elaborate must use the explicit-column path.
254pub(crate) fn introspect_pg_table_for_chunking(
255    url: &str,
256    tls: Option<&TlsConfig>,
257    qualified_table: &str,
258) -> Result<crate::source::TableIntrospection> {
259    let (schema, table) = match qualified_table.split_once('.') {
260        Some((s, t)) => (s.to_string(), t.to_string()),
261        None => ("public".to_string(), qualified_table.to_string()),
262    };
263    let mut client = connect_client(url, tls)?;
264
265    // ── reltuples + heap size, in one shot ──────────────────────────────
266    let (row_estimate, rel_size_bytes) = match client.query_opt(
267        "SELECT c.reltuples::bigint, pg_relation_size(c.oid)::bigint \
268         FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace \
269         WHERE n.nspname = $1::text AND c.relname = $2::text",
270        &[&schema, &table],
271    )? {
272        Some(row) => {
273            let rt: i64 = row.try_get(0).unwrap_or(0);
274            let sz: i64 = row.try_get(1).unwrap_or(0);
275            (rt.max(0), sz.max(0))
276        }
277        None => (0, 0),
278    };
279    let avg_row_bytes = if row_estimate > 0 {
280        Some(rel_size_bytes / row_estimate)
281    } else {
282        None
283    };
284
285    // ── single int PK probe ─────────────────────────────────────────────
286    let pk_rows = client.query(
287        "SELECT a.attname::text, t.typname::text \
288         FROM pg_index i \
289         JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) \
290         JOIN pg_type t ON t.oid = a.atttypid \
291         WHERE i.indrelid = (($1::text || '.' || $2::text)::regclass) \
292           AND i.indisprimary",
293        &[&schema, &table],
294    )?;
295    let single_int_pk = if pk_rows.len() == 1 {
296        let col: String = pk_rows[0].get(0);
297        let pg_type: String = pk_rows[0].get(1);
298        // Only integer-family types are safe for range chunking via min/max →
299        // BETWEEN slicing. Text/UUID/decimal would need different splitting
300        // logic and are excluded from auto-resolution.
301        if matches!(pg_type.as_str(), "int2" | "int4" | "int8") {
302            Some(col)
303        } else {
304            log::debug!(
305                "introspect_pg_table: PK '{col}' on {schema}.{table} has non-int type '{pg_type}' — skipping auto-resolve"
306            );
307            None
308        }
309    } else {
310        None
311    };
312
313    // ── keyset keys (OPT-4): single-column, NOT NULL, UNIQUE indexes ────
314    // `indnkeyatts = 1` keeps single-column indexes; `indkey[0] = a.attnum`
315    // binds to a real column (not an expression index); `attnotnull` removes
316    // NULL-ordering ambiguity. Index-backed + unique ⇒ keyset's `ORDER BY key
317    // LIMIT n` is a range scan and `WHERE key > last` never skips dup keys.
318    let keyset_rows = client.query(
319        "SELECT a.attname::text, i.indisprimary \
320         FROM pg_index i \
321         JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = i.indkey[0] \
322         WHERE i.indrelid = (($1::text || '.' || $2::text)::regclass) \
323           AND i.indisunique AND i.indnkeyatts = 1 AND a.attnotnull",
324        &[&schema, &table],
325    )?;
326    let mut keyset_keys: Vec<String> = Vec::new();
327    for primary in [true, false] {
328        for row in &keyset_rows {
329            let col: String = row.get(0);
330            let is_primary: bool = row.get(1);
331            if is_primary == primary && !keyset_keys.contains(&col) {
332                keyset_keys.push(col);
333            }
334        }
335    }
336
337    Ok(crate::source::TableIntrospection {
338        single_int_pk,
339        keyset_keys,
340        row_estimate,
341        avg_row_bytes,
342    })
343}
344
345/// Open a bare `postgres::Client` honoring the configured TLS policy.
346///
347/// Shared by preflight, doctor, and init so every code path that connects to
348/// Postgres applies the same transport-security rules. `tls = None` or
349/// `mode: disable` falls back to the insecure `NoTls` transport — a warning is
350/// logged from `create_source` so operators know TLS is off.
351pub(crate) fn connect_client(url: &str, tls: Option<&TlsConfig>) -> Result<Client> {
352    match tls {
353        Some(cfg) if cfg.mode.is_enforced() => {
354            let connector = build_native_tls(cfg)?;
355            let make_tls = postgres_native_tls::MakeTlsConnector::new(connector);
356            Ok(Client::connect(url, make_tls)?)
357        }
358        _ => Ok(Client::connect(url, NoTls)?),
359    }
360}
361
362/// Run the full export transaction against an open Postgres client.
363///
364/// All session-mutating SET commands use SET LOCAL so they are scoped to
365/// the transaction and reset automatically on COMMIT or ROLLBACK. The caller
366/// is responsible for issuing ROLLBACK if this function returns Err.
367///
368/// Returns (total_rows, had_schema). had_schema is false only when the query
369/// returned zero rows; the caller must emit an empty schema in that case.
370fn pg_run_export(
371    client: &mut Client,
372    built_sql: &str,
373    tuning: &SourceTuning,
374    column_overrides: &ColumnOverrides,
375    sink: &mut dyn super::BatchSink,
376    numeric_hints: Option<&HashMap<String, (u8, i8)>>,
377) -> Result<(usize, bool)> {
378    // Open the txn under guard *first* — if SET LOCAL or DECLARE fails below,
379    // Drop will roll back. Without the guard, a failure between BEGIN and the
380    // explicit ROLLBACK in the caller would leak a half-set-up txn into the pool.
381    let mut guard = PgTxnGuard::begin(client)?;
382    if tuning.statement_timeout_s > 0 {
383        guard.client_mut().batch_execute(&format!(
384            "SET LOCAL statement_timeout = '{}s'",
385            tuning.statement_timeout_s
386        ))?;
387    }
388    if tuning.lock_timeout_s > 0 {
389        guard.client_mut().batch_execute(&format!(
390            "SET LOCAL lock_timeout = '{}s'",
391            tuning.lock_timeout_s
392        ))?;
393    }
394    // Cap FETCH N under `work_mem × 0.7` so the cursor never spills to
395    // `pgsql_tmp/`. Without this, a wide-row chunk with the default
396    // `batch_size: 50000` × ~4 KB/row = ~200 MB easily exceeds the typical
397    // `work_mem: 4 MB` and writes the entire chunk to disk before the first
398    // FETCH returns. Measured cost on the content_items bench: ~3.2 GB of
399    // temp_bytes per export, dominating the DB-side signal report.
400    let work_mem_bytes = pg_fetch_work_mem_bytes(guard.client_mut());
401
402    guard
403        .client_mut()
404        .batch_execute(&format!("DECLARE _rivet NO SCROLL CURSOR FOR {built_sql}"))?;
405
406    // First FETCH is intentionally small — it acts as a row-width probe.
407    // Without it we can't know `arrow_bytes/row` before the cursor runs, and a
408    // single FETCH of `tuning.batch_size` × wide rows already triggers spill.
409    // 500 wide rows × 4 KB ≈ 2 MB, well under the typical work_mem of 4 MB.
410    const PROBE_FETCH_SIZE: usize = 500;
411    let configured_batch_size = tuning.batch_size;
412    let mut fetch_size = configured_batch_size.min(PROBE_FETCH_SIZE);
413    let mut fetch_sql = format!("FETCH {} FROM _rivet", fetch_size);
414    let mut work_mem_cap_applied = false;
415    let mut schema: Option<SchemaRef> = None;
416    let mut columns_cache: Option<Vec<(String, Type)>> = None;
417    let mut total_rows: usize = 0;
418    let mut base_fetch_size = fetch_size;
419    let mut adaptive_last_ckpt: Option<i64> = if tuning.adaptive {
420        pg_sample_checkpoints_req(guard.client_mut())
421    } else {
422        None
423    };
424    let mut batch_count: usize = 0;
425
426    loop {
427        // `fetch_size` may be resized mid-iteration (probe → work_mem cap
428        // adjustment). The end-of-loop "exhausted" check must compare against
429        // the size we actually requested in this round, not the resized one.
430        let requested_this_iter = fetch_size;
431        let rows = guard.client_mut().query(&fetch_sql, &[])?;
432        if rows.is_empty() {
433            break;
434        }
435
436        if schema.is_none() {
437            let stmt_cols: Vec<(String, Type)> = rows[0]
438                .columns()
439                .iter()
440                .map(|c| (c.name().to_string(), c.type_().clone()))
441                .collect();
442            let s = Arc::new(pg_columns_to_schema(
443                rows[0].columns(),
444                column_overrides,
445                numeric_hints,
446            )?);
447            sink.on_schema(s.clone())?;
448            schema = Some(s.clone());
449            columns_cache = Some(stmt_cols);
450
451            // Defer the work_mem cap to after we measure the actual batch
452            // bytes below — schema-only estimates under-count wide TEXT.
453            let effective = tuning.effective_batch_size(Some(&s));
454            if effective != fetch_size && work_mem_bytes.is_none() {
455                fetch_size = effective.max(fetch_size);
456                fetch_sql = format!("FETCH {} FROM _rivet", fetch_size);
457            }
458            base_fetch_size = fetch_size;
459        }
460
461        let row_count = rows.len();
462        total_rows += row_count;
463
464        let s = schema.as_ref().expect("schema set on first iteration");
465        let cols = columns_cache
466            .as_ref()
467            .expect("columns set on first iteration");
468        let batch = rows_to_record_batch_typed(s, cols, &rows)?;
469        drop(rows);
470
471        // After the first (small probe) batch we know the actual row width.
472        // Compute the cursor-spill-safe FETCH size from observed Arrow bytes:
473        //   pg_row_bytes ≈ arrow_per_row × 1.2  (small fudge for tuple header)
474        //   safe = work_mem × 0.7 / pg_row_bytes
475        // Subsequent FETCHes use that, clamped never to exceed the user's
476        // configured `batch_size` (so an explicit knob still wins).
477        if !work_mem_cap_applied
478            && let Some(wm) = work_mem_bytes
479            && row_count > 0
480        {
481            let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
482            let arrow_per_row = (arrow_bytes / row_count).max(1);
483            let pg_per_row = ((arrow_per_row * 12) / 10).max(64);
484            let safe = (((wm as f64) * 0.7) as usize / pg_per_row).max(100);
485            let mut target = safe.min(configured_batch_size);
486            if let Some(mem_mb) = tuning.batch_size_memory_mb {
487                let arrow_target = (mem_mb * 1024 * 1024) / arrow_per_row;
488                target = target.min(arrow_target.max(100));
489            }
490            if target != fetch_size {
491                log::info!(
492                    "PG work_mem={} B, observed row={} B (arrow), pg≈{} B → FETCH N {} → {} (configured={})",
493                    wm,
494                    arrow_per_row,
495                    pg_per_row,
496                    fetch_size,
497                    target,
498                    configured_batch_size,
499                );
500                fetch_size = target;
501                fetch_sql = format!("FETCH {} FROM _rivet", fetch_size);
502                base_fetch_size = fetch_size;
503            }
504            work_mem_cap_applied = true;
505        }
506
507        sink.on_batch(&batch)?;
508
509        batch_count += 1;
510        if tuning.adaptive
511            && batch_count.is_multiple_of(ADAPTIVE_SAMPLE_INTERVAL)
512            && let Some(cur) = pg_sample_checkpoints_req(guard.client_mut())
513        {
514            let under_pressure = adaptive_last_ckpt.is_some_and(|prev| cur > prev);
515            adaptive_last_ckpt = Some(cur);
516            let next = next_adaptive_batch_size(fetch_size, base_fetch_size, under_pressure);
517            if next != fetch_size {
518                fetch_size = next;
519                fetch_sql = format!("FETCH {} FROM _rivet", fetch_size);
520                log::info!(
521                    "adaptive batch size → {} ({})",
522                    fetch_size,
523                    if under_pressure {
524                        "pressure"
525                    } else {
526                        "recovery"
527                    }
528                );
529            }
530        }
531
532        log::info!("fetched {} rows so far...", total_rows);
533
534        if row_count < requested_this_iter {
535            break;
536        }
537
538        if tuning.throttle_ms > 0 {
539            std::thread::sleep(std::time::Duration::from_millis(tuning.throttle_ms));
540        }
541    }
542
543    // Explicit CLOSE is technically redundant — COMMIT releases the cursor —
544    // but it documents intent and surfaces any close errors before COMMIT.
545    guard.client_mut().batch_execute("CLOSE _rivet")?;
546    guard.commit()?;
547    Ok((total_rows, schema.is_some()))
548}
549
550impl super::Source for PostgresSource {
551    fn export(
552        &mut self,
553        request: &super::ExportRequest<'_>,
554        sink: &mut dyn super::BatchSink,
555    ) -> Result<()> {
556        let built = build_export_query(request, SourceType::Postgres);
557        debug_assert!(
558            built.cursor_param.is_none(),
559            "Postgres path inlines cursor values as E'…' literals — binding is unused"
560        );
561        log::debug!(
562            "executing query (connection={}): {}",
563            if self.transaction_pooler {
564                "transaction-pooler"
565            } else {
566                "direct"
567            },
568            built.sql
569        );
570
571        let numeric_hints = pg_numeric_catalog_hints_opt(&mut self.client, request.query);
572
573        // PgTxnGuard inside pg_run_export rolls the txn back automatically on
574        // any error or panic, so no explicit ROLLBACK is needed here.
575        let (total_rows, had_schema) = pg_run_export(
576            &mut self.client,
577            &built.sql,
578            request.tuning,
579            request.column_overrides,
580            sink,
581            numeric_hints.as_ref(),
582        )?;
583
584        if !had_schema {
585            sink.on_schema(Arc::new(Schema::empty()))?;
586        }
587
588        log::info!("total: {} rows", total_rows);
589        Ok(())
590    }
591
592    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
593        let rows = self.client.query(sql, &[])?;
594        if rows.is_empty() {
595            return Ok(None);
596        }
597        let row = &rows[0];
598        if let Ok(Some(v)) = row.try_get::<_, Option<i64>>(0) {
599            return Ok(Some(v.to_string()));
600        }
601        if let Ok(Some(v)) = row.try_get::<_, Option<i32>>(0) {
602            return Ok(Some(v.to_string()));
603        }
604        if let Ok(Some(v)) = row.try_get::<_, Option<f64>>(0) {
605            return Ok(Some(v.to_string()));
606        }
607        // TIMESTAMP / DATE / TIMESTAMPTZ — required for MIN/MAX on time columns (e.g. chunk_by_days)
608        if let Ok(Some(v)) = row.try_get::<_, Option<chrono::NaiveDateTime>>(0) {
609            return Ok(Some(v.format("%Y-%m-%d %H:%M:%S").to_string()));
610        }
611        if let Ok(Some(v)) = row.try_get::<_, Option<chrono::NaiveDate>>(0) {
612            return Ok(Some(v.format("%Y-%m-%d").to_string()));
613        }
614        if let Ok(Some(v)) = row.try_get::<_, Option<chrono::DateTime<chrono::Utc>>>(0) {
615            return Ok(Some(v.format("%Y-%m-%d %H:%M:%S").to_string()));
616        }
617        if let Ok(Some(v)) = row.try_get::<_, Option<String>>(0) {
618            return Ok(Some(v));
619        }
620        Ok(None)
621    }
622
623    fn type_mappings(
624        &mut self,
625        query: &str,
626        column_overrides: &ColumnOverrides,
627    ) -> Result<Vec<TypeMapping>> {
628        let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
629        let stmt = self.client.prepare(&wrapped)?;
630        let hints = pg_numeric_catalog_hints_opt(&mut self.client, query);
631        let mappings = stmt
632            .columns()
633            .iter()
634            .map(|col| {
635                let rivet = rivet_type_for_pg_column(col, column_overrides, hints.as_ref());
636                let source = SourceColumn::simple(col.name(), col.type_().name(), true);
637                TypeMapping::from_source(&source, rivet)
638            })
639            .collect();
640        Ok(mappings)
641    }
642
643    /// Governor pressure proxy: `pg_stat_bgwriter.checkpoints_req` — the same
644    /// monotonic counter the adaptive batch loop samples. Rising between samples
645    /// means the source is checkpointing harder under write pressure.
646    fn sample_pressure(&mut self) -> Option<u64> {
647        pg_sample_checkpoints_req(&mut self.client).map(|v| v.max(0) as u64)
648    }
649}
650
651/// When the query is a single-table `SELECT … FROM rel` (no joins, no subquery
652/// in `FROM`), PostgreSQL result metadata does not carry `NUMERIC` typmod, but
653/// `information_schema` / the table DDL does. We resolve the base relation with
654/// a small parser and fetch declared precision/scale so `rivet init`-style
655/// exports work without hand-written `columns:` overrides.
656fn pg_numeric_catalog_hints_opt(
657    client: &mut Client,
658    query: &str,
659) -> Option<HashMap<String, (u8, i8)>> {
660    match pg_fetch_numeric_catalog_hints(client, query) {
661        Ok(m) => m,
662        Err(e) => {
663            // Reaching this arm means the parser identified a single-table query
664            // and we tried catalog lookup, but the lookup itself failed. That is
665            // unexpected (not "this query has a JOIN"), so surface it — otherwise
666            // a downstream NUMERIC mapping failure looks like a config problem
667            // when the real cause is here.
668            log::warn!(
669                "PG numeric catalog lookup failed — NUMERIC columns will require explicit `columns:` overrides: {e}"
670            );
671            None
672        }
673    }
674}
675
676fn pg_fetch_numeric_catalog_hints(
677    client: &mut Client,
678    query: &str,
679) -> crate::error::Result<Option<HashMap<String, (u8, i8)>>> {
680    let Some(regclass_lit) = try_parse_pg_simple_from_regclass_literal(query) else {
681        return Ok(None);
682    };
683    let locate_sql = "SELECT n.nspname::text, c.relname::text \
684         FROM pg_catalog.pg_class c \
685         JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace \
686         WHERE c.oid = ($1::text)::regclass";
687    let row_opt = match client.query_opt(locate_sql, &[&regclass_lit]) {
688        Ok(r) => r,
689        Err(e) => {
690            log::warn!("PG numeric catalog: '{regclass_lit}' regclass lookup failed: {e}");
691            return Ok(None);
692        }
693    };
694    let Some(row) = row_opt else {
695        return Ok(None);
696    };
697    let schema: String = row.get(0);
698    let table: String = row.get(1);
699    let rows = client.query(
700        "SELECT column_name::text, data_type::text, numeric_precision, numeric_scale \
701             FROM information_schema.columns \
702             WHERE table_schema = $1 AND table_name = $2 \
703             ORDER BY ordinal_position",
704        &[&schema, &table],
705    )?;
706
707    let mut map = HashMap::new();
708    for row in rows {
709        let col: String = row.get(0);
710        let dt: String = row.get(1);
711        if !is_pg_numeric_information_type(&dt) {
712            continue;
713        }
714        let p: Option<i32> = row.get(2);
715        let s: Option<i32> = row.get(3);
716        if let (Some(p), Some(s)) = (p, s)
717            && let Some(pair) = catalog_numeric_to_decimal_params(p, s)
718        {
719            map.insert(col, pair);
720        }
721    }
722
723    if map.is_empty() {
724        Ok(None)
725    } else {
726        log::debug!(
727            "PG numeric catalog: resolved {} DECIMAL/NUMERIC column(s) for relation {regclass_lit}",
728            map.len(),
729        );
730        Ok(Some(map))
731    }
732}
733
734fn is_pg_numeric_information_type(dt: &str) -> bool {
735    let d = dt.trim().to_ascii_lowercase();
736    matches!(d.as_str(), "numeric" | "decimal")
737        || d.starts_with("numeric(")
738        || d.starts_with("decimal(")
739}
740
741/// Match Rivet YAML `decimal(p,s)` / Arrow limits (same bound as overrides).
742fn catalog_numeric_to_decimal_params(precision: i32, scale: i32) -> Option<(u8, i8)> {
743    if precision <= 0 || precision > 76 {
744        return None;
745    }
746    let precision_u = precision as u8;
747    if scale < i32::from(i8::MIN) || scale > i32::from(i8::MAX) {
748        return None;
749    }
750    let scale_i = scale as i8;
751    if scale_i > precision as i8 {
752        return None;
753    }
754    Some((precision_u, scale_i))
755}
756
757#[cfg(test)]
758mod tests {
759    use super::catalog_numeric_to_decimal_params;
760
761    // FROM-clause parser tests live in `from_parse.rs` alongside the parser.
762
763    #[test]
764    fn catalog_decimal_bounds() {
765        assert_eq!(catalog_numeric_to_decimal_params(18, 2), Some((18, 2)));
766        assert!(catalog_numeric_to_decimal_params(0, 2).is_none());
767        assert!(catalog_numeric_to_decimal_params(77, 0).is_none());
768        assert!(catalog_numeric_to_decimal_params(18, 19).is_none());
769    }
770
771    #[test]
772    fn parse_work_mem_handles_pg_units() {
773        use super::parse_work_mem;
774        // Postgres SHOW work_mem normally returns "<N>kB", "<N>MB", "<N>GB".
775        // A bare integer is interpreted as kB (matches postgresql.conf parsing).
776        assert_eq!(parse_work_mem("4MB"), Some(4 * 1024 * 1024));
777        assert_eq!(parse_work_mem("16384kB"), Some(16384 * 1024));
778        assert_eq!(parse_work_mem("1GB"), Some(1024 * 1024 * 1024));
779        assert_eq!(parse_work_mem("  4MB  "), Some(4 * 1024 * 1024));
780        assert_eq!(parse_work_mem("4mb"), Some(4 * 1024 * 1024));
781        assert_eq!(parse_work_mem("65536"), Some(65536 * 1024));
782        assert_eq!(parse_work_mem(""), None);
783        assert_eq!(parse_work_mem("garbage"), None);
784        // We don't accept seconds / units PG would never emit for work_mem.
785        assert_eq!(parse_work_mem("4s"), None);
786    }
787}