rivet-cli 0.9.2

Rivet: PostgreSQL/MySQL/SQL Server → Parquet/CSV (local, S3, GCS, Azure). Crate name rivet-cli; binary rivet.
Documentation
use crate::error::Result;

use super::{ColumnInfo, TableInfo};

/// Tables and views in a PostgreSQL schema (`information_schema`).
pub(super) fn list_tables(url: &str, schema: &str) -> Result<Vec<String>> {
    let mut client = postgres::Client::connect(url, postgres::NoTls)?;
    let rows = client.query(
        "SELECT table_name FROM information_schema.tables
         WHERE table_schema = $1 AND table_type IN ('BASE TABLE', 'VIEW')
         ORDER BY table_name",
        &[&schema],
    )?;
    Ok(rows.into_iter().map(|r| r.get::<_, String>(0)).collect())
}

pub(super) fn introspect(url: &str, schema: &str, table: &str) -> Result<TableInfo> {
    let mut client = postgres::Client::connect(url, postgres::NoTls)?;

    // Row estimate from pg_class (fast, no COUNT(*))
    let row_estimate: i64 = client
        .query_opt(
            "SELECT reltuples::bigint FROM pg_class
             WHERE relname = $1 AND relnamespace = (
                 SELECT oid FROM pg_namespace WHERE nspname = $2
             )",
            &[&table, &schema],
        )?
        .and_then(|row| row.get::<_, Option<i64>>(0))
        .unwrap_or(0)
        .max(0);

    // Physical size (heap + indexes); None for views and when privileges are missing.
    let total_bytes: Option<i64> = client
        .query_opt(
            "SELECT pg_total_relation_size(c.oid)::bigint
             FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
             WHERE c.relname = $1 AND n.nspname = $2 AND c.relkind IN ('r','p','m')",
            &[&table, &schema],
        )
        .ok()
        .flatten()
        .and_then(|row| row.get::<_, Option<i64>>(0))
        .filter(|v| *v > 0);

    // Primary key columns
    // to_regclass() returns NULL (no error) when the table disappears between
    // list_tables and introspect — possible under concurrent test table drops.
    let pk_rows = client.query(
        "SELECT a.attname
         FROM pg_index i
         JOIN pg_attribute a ON a.attrelid = i.indrelid
             AND a.attnum = ANY(i.indkey)
         WHERE i.indrelid = to_regclass($1 || '.' || $2)
           AND i.indrelid IS NOT NULL
           AND i.indisprimary",
        &[&schema, &table],
    )?;
    let pk_cols: std::collections::HashSet<String> =
        pk_rows.iter().map(|r| r.get::<_, String>(0)).collect();

    // Column metadata — including NULL-ability and numeric precision/scale for decimal columns.
    let col_rows = client.query(
        "SELECT column_name, data_type, is_nullable, numeric_precision, numeric_scale
         FROM information_schema.columns
         WHERE table_schema = $1 AND table_name = $2
         ORDER BY ordinal_position",
        &[&schema, &table],
    )?;

    if col_rows.is_empty() {
        anyhow::bail!(
            "Table '{schema}.{table}' not found or has no columns. \
             Check the table name and that the user has SELECT privilege."
        );
    }

    let columns = col_rows
        .iter()
        .map(|row| {
            let name: String = row.get(0);
            let data_type: String = row.get(1);
            let is_nullable_str: String = row.get(2);
            let numeric_precision: Option<i32> = row.get(3);
            let numeric_scale: Option<i32> = row.get(4);
            let is_primary_key = pk_cols.contains(&name);
            ColumnInfo {
                name,
                data_type,
                is_primary_key,
                is_nullable: is_nullable_str.eq_ignore_ascii_case("YES"),
                numeric_precision: numeric_precision.map(|v| v as u32),
                numeric_scale: numeric_scale.map(|v| v as u32),
            }
        })
        .collect();

    Ok(TableInfo {
        schema: schema.to_string(),
        table: table.to_string(),
        row_estimate,
        total_bytes,
        columns,
    })
}