taxa-sql 0.1.0

taxa SqlSource: ingest a Postgres query into an in-memory Polars DataFrame (a taxa_core::Source).
//! `SqlSource` — a [`taxa_core::Source`] backed by a one-shot Postgres query
//! ingested into an in-memory Polars `DataFrame`.
//!
//! The heavy `postgres` / `rust_decimal` dependencies live here, NOT in
//! `taxa-core`. v1 is "ingest once at build time": [`SqlSource::connect`] runs
//! the query, decodes every column into a Polars `Series`, and caches the
//! resulting `DataFrame`. This fits taxa's "build the source before the tokio
//! runtime" rule. Refresh = reconstruct (or [`SqlSource::reload`]).
//!
//! Per-column type dispatch keyed on `postgres::types::Type` builds each
//! `Series` directly (no Arrow round-trip). NUMERIC is decoded via
//! `rust_decimal::Decimal` → `f64`. Temporal columns (DATE / TIMESTAMP /
//! TIMESTAMPTZ) become Utf8 ISO strings in v1 — taxa's series engine casts
//! string→Date downstream.

use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use polars::prelude::*;
use postgres::types::Type;
use postgres::{Client, NoTls, Row};
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use std::collections::HashSet;

use taxa_core::error::{Error, Result};
use taxa_core::source::Source;

/// A [`Source`] whose data is a Postgres query result ingested eagerly into an
/// in-memory Polars `DataFrame`.
pub struct SqlSource {
    df: DataFrame,
    dsn: String,
    query: String,
}

impl SqlSource {
    /// Connect to `dsn`, run `query` once, and cache the resulting `DataFrame`.
    ///
    /// The sync `postgres` crate needs an explicit host in the DSN (e.g.
    /// `host=/tmp dbname=investing`), not a `postgresql:///investing` URI.
    pub fn connect(dsn: &str, query: &str) -> Result<Self> {
        let df = Self::ingest(dsn, query)?;
        Ok(SqlSource {
            df,
            dsn: dsn.to_string(),
            query: query.to_string(),
        })
    }

    /// Re-run the original query and replace the cached frame.
    pub fn reload(&mut self) -> Result<()> {
        self.df = Self::ingest(&self.dsn, &self.query)?;
        Ok(())
    }

    /// The number of rows in the cached frame.
    pub fn height(&self) -> usize {
        self.df.height()
    }

    fn ingest(dsn: &str, query: &str) -> Result<DataFrame> {
        let mut client = Client::connect(dsn, NoTls)
            .map_err(|e| Error::Engine(format!("postgres connect failed ({dsn}): {e}")))?;
        // Prepare first so the column schema (names + types) is known even when
        // the query returns zero rows — an empty result must still be a typed,
        // 0-row frame, not a 0x0 frame (which loses columns the series view needs).
        let stmt = client
            .prepare(query)
            .map_err(|e| Error::Engine(format!("postgres prepare failed: {e}")))?;
        let schema: Vec<(String, Type)> = stmt
            .columns()
            .iter()
            .map(|c| (c.name().to_string(), c.type_().clone()))
            .collect();
        let rows = client
            .query(&stmt, &[])
            .map_err(|e| Error::Engine(format!("postgres query failed: {e}")))?;
        build_dataframe(&schema, &rows)
    }
}

impl Source for SqlSource {
    fn frame(&self) -> Result<LazyFrame> {
        Ok(self.df.clone().lazy())
    }

    fn columns(&self) -> Result<HashSet<String>> {
        Ok(self
            .df
            .get_column_names()
            .into_iter()
            .map(|s| s.to_string())
            .collect())
    }

    fn schema(&self) -> Result<Vec<(String, String)>> {
        Ok(self
            .df
            .iter()
            .map(|s| (s.name().to_string(), format!("{}", s.dtype())))
            .collect())
    }
}

/// Build a Polars `DataFrame` from a slice of Postgres `Row`s via per-column
/// type dispatch on `postgres::types::Type`, using `schema` (name+type per
/// column, taken from the prepared statement) to drive the dispatch.
///
/// An empty result set still yields a typed, 0-row frame: the schema comes from
/// the prepared statement's column metadata (available without any rows), so we
/// emit one typed 0-row `Series` per column. (Previously a 0-row result returned
/// a 0x0 frame, which dropped every column and broke the series view.)
fn build_dataframe(schema: &[(String, Type)], rows: &[Row]) -> Result<DataFrame> {
    let mut series: Vec<Column> = Vec::with_capacity(schema.len());
    for (idx, (name, ty)) in schema.iter().enumerate() {
        // `decode_column` builds empty typed vecs when `rows` is empty, so this
        // path produces a typed 0-row Series per column for an empty result.
        series.push(decode_column(rows, idx, name, ty)?.into_column());
    }

    DataFrame::new(series).map_err(|e| Error::Engine(format!("dataframe build failed: {e}")))
}

/// Decode one column (by index) across all rows into a typed Polars `Series`,
/// dispatching on the Postgres column type. NULLs become `None`.
fn decode_column(rows: &[Row], idx: usize, name: &str, ty: &Type) -> Result<Series> {
    let pl_name: PlSmallStr = name.into();
    let n = rows.len();

    macro_rules! collect {
        ($t:ty) => {{
            let mut v: Vec<Option<$t>> = Vec::with_capacity(n);
            for row in rows {
                v.push(row.get::<usize, Option<$t>>(idx));
            }
            v
        }};
    }

    let series = match *ty {
        Type::BOOL => Series::new(pl_name, collect!(bool)),
        Type::INT2 => Series::new(pl_name, collect!(i16)),
        Type::INT4 => Series::new(pl_name, collect!(i32)),
        Type::INT8 => Series::new(pl_name, collect!(i64)),
        Type::FLOAT4 => Series::new(pl_name, collect!(f32)),
        Type::FLOAT8 => Series::new(pl_name, collect!(f64)),
        Type::NUMERIC => {
            // Decimal -> f64 (Float64 Series). v1 trades exactness for a dtype
            // the engine handles natively.
            let mut v: Vec<Option<f64>> = Vec::with_capacity(n);
            for row in rows {
                let dec: Option<Decimal> = row.get(idx);
                v.push(dec.and_then(|d| d.to_f64()));
            }
            Series::new(pl_name, v)
        }
        Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME => {
            Series::new(pl_name, collect!(String))
        }
        // TODO: emit real Date/Datetime dtypes once the series view is wired;
        // v1 surfaces temporal columns as Utf8 ISO strings (the engine casts
        // string->Date downstream).
        Type::DATE => {
            let mut v: Vec<Option<String>> = Vec::with_capacity(n);
            for row in rows {
                let d: Option<NaiveDate> = row.get(idx);
                v.push(d.map(|d| d.format("%Y-%m-%d").to_string()));
            }
            Series::new(pl_name, v)
        }
        Type::TIMESTAMP => {
            let mut v: Vec<Option<String>> = Vec::with_capacity(n);
            for row in rows {
                let t: Option<NaiveDateTime> = row.get(idx);
                v.push(t.map(|t| t.format("%Y-%m-%dT%H:%M:%S%.f").to_string()));
            }
            Series::new(pl_name, v)
        }
        Type::TIMESTAMPTZ => {
            let mut v: Vec<Option<String>> = Vec::with_capacity(n);
            for row in rows {
                let t: Option<DateTime<Utc>> = row.get(idx);
                v.push(t.map(|t| t.to_rfc3339()));
            }
            Series::new(pl_name, v)
        }
        ref other => {
            return Err(Error::Engine(format!(
                "taxa-sql: column '{name}' has unsupported Postgres type '{other}'. \
                 Cast it in the SQL query (e.g. `{name}::text` or `{name}::float8`)."
            )));
        }
    };

    Ok(series)
}

#[cfg(test)]
mod tests {
    use super::*;

    // A DB-free smoke test of the empty-result-set path: with a known schema and
    // zero rows, the frame must preserve the columns as typed 0-row Series — NOT
    // collapse to a 0x0 frame (which would break the downstream series view).
    #[test]
    fn empty_rows_preserve_schema() {
        let schema = vec![
            ("symbol".to_string(), Type::TEXT),
            ("dt".to_string(), Type::DATE),
            ("mcap".to_string(), Type::FLOAT8),
            ("n".to_string(), Type::INT8),
        ];
        let rows: Vec<Row> = Vec::new();
        let df = build_dataframe(&schema, &rows).expect("typed empty frame");
        assert_eq!(df.shape(), (0, 4), "0 rows but all 4 columns preserved");
        assert_eq!(
            df.get_column_names()
                .iter()
                .map(|s| s.as_str())
                .collect::<Vec<_>>(),
            ["symbol", "dt", "mcap", "n"]
        );
        // Types come through (DATE/TEXT -> String in v1; FLOAT8 -> Float64; INT8 -> Int64).
        assert_eq!(df.column("symbol").unwrap().dtype(), &DataType::String);
        assert_eq!(df.column("dt").unwrap().dtype(), &DataType::String);
        assert_eq!(df.column("mcap").unwrap().dtype(), &DataType::Float64);
        assert_eq!(df.column("n").unwrap().dtype(), &DataType::Int64);
    }

    // DB-free tripwire for the FULL Postgres->Polars dtype contract that monocle's
    // SQL sources rely on. NOT a guardrail — if a mapping intentionally changes,
    // update this; the point is that an *accidental* change (e.g. NUMERIC stops
    // being Float64, or a temporal type stops being a Utf8 ISO string) reddens
    // here rather than silently corrupting a downstream frame. Keep in sync with
    // `decode_column`'s dispatch.
    #[test]
    fn postgres_to_polars_dtype_contract() {
        let cases: &[(Type, DataType)] = &[
            (Type::BOOL, DataType::Boolean),
            (Type::INT2, DataType::Int16),
            (Type::INT4, DataType::Int32),
            (Type::INT8, DataType::Int64),
            (Type::FLOAT4, DataType::Float32),
            (Type::FLOAT8, DataType::Float64),
            (Type::NUMERIC, DataType::Float64), // Decimal -> f64 (v1 contract)
            (Type::TEXT, DataType::String),
            (Type::VARCHAR, DataType::String),
            (Type::BPCHAR, DataType::String),
            (Type::NAME, DataType::String),
            (Type::DATE, DataType::String), // temporal -> Utf8 ISO (engine casts downstream)
            (Type::TIMESTAMP, DataType::String),
            (Type::TIMESTAMPTZ, DataType::String),
        ];
        let rows: Vec<Row> = Vec::new();
        for (pg, expected) in cases {
            let schema = vec![("c".to_string(), pg.clone())];
            let df = build_dataframe(&schema, &rows).expect("typed empty frame");
            assert_eq!(
                df.column("c").unwrap().dtype(),
                expected,
                "Postgres {pg:?} must map to {expected:?}"
            );
        }

        // An UNSUPPORTED type must be a clear error (cast-in-SQL guidance), never a
        // panic or a silently-dropped column.
        let schema = vec![("c".to_string(), Type::JSON)];
        let err = build_dataframe(&schema, &rows).unwrap_err().to_string();
        assert!(
            err.contains("unsupported"),
            "unsupported type errors clearly: {err}"
        );
    }
}