postgres_to_polars 1.0.2

Stream PostgreSQL query results directly into Polars DataFrames via sqlx.
Documentation
use polars::prelude::SchemaExt;
use postgres_to_polars::{IntoDataFrame, StreamToDataFrame};
use sqlx::PgPool;

#[derive(sqlx::FromRow, IntoDataFrame)]
struct UserRow {
    id: i32,
}

#[derive(sqlx::FromRow, IntoDataFrame)]
struct ValRow {
    val: Option<i32>,
}

#[derive(sqlx::FromRow, IntoDataFrame)]
struct CountRow {
    count: Option<i64>,
}

#[derive(sqlx::FromRow, IntoDataFrame)]
struct UserFullRow {
    id: i32,
    first_name: Option<String>,
    last_name: Option<String>,
    email: Option<String>,
}

#[sqlx::test]
async fn test_simple_query(pool: PgPool) {
    let df = sqlx::query_as!(UserRow, "SELECT id FROM users LIMIT 10")
        .fetch(&pool)
        .to_dataframe(10)
        .await
        .expect("Query failed");

    assert!(df.height() <= 10, "Should have at most 10 rows");
    assert_eq!(df.width(), 1);
}

#[sqlx::test]
async fn test_count_query(pool: PgPool) {
    let df = sqlx::query_as!(CountRow, "SELECT COUNT(*) as count FROM users")
        .fetch(&pool)
        .to_dataframe(1)
        .await
        .expect("Query failed");

    assert_eq!(df.height(), 1, "Should have 1 row");
}

#[sqlx::test]
async fn test_query_with_params(pool: PgPool) {
    let df = sqlx::query_as!(UserRow, "SELECT id FROM users WHERE id = $1", 1i32)
        .fetch(&pool)
        .to_dataframe(1)
        .await
        .expect("Query failed");

    assert!(df.height() <= 1, "Should have at most 1 row");
}

#[sqlx::test]
async fn test_prepared_statement_cache(pool: PgPool) {
    let df1 = sqlx::query_as!(UserRow, "SELECT id FROM users WHERE id = $1", 1i32)
        .fetch(&pool)
        .to_dataframe(1)
        .await;
    assert!(df1.is_ok(), "First query should succeed");

    let df2 = sqlx::query_as!(UserRow, "SELECT id FROM users WHERE id = $1", 2i32)
        .fetch(&pool)
        .to_dataframe(1)
        .await;
    assert!(
        df2.is_ok(),
        "Second query should succeed (cached statement)"
    );
}

#[sqlx::test]
async fn test_error_handling(pool: PgPool) {
    // query_as::<_, T> (not macro) because the table doesn't exist — would fail at compile time with query_as!
    let result = sqlx::query_as::<_, UserRow>("SELECT id FROM table_qui_nexiste_pas")
        .fetch(&pool)
        .to_dataframe_default()
        .await;

    assert!(result.is_err(), "Query should have failed");
}

#[sqlx::test]
async fn test_multiple_columns(pool: PgPool) {
    let df = sqlx::query_as!(
        UserFullRow,
        "SELECT id, first_name, last_name, email FROM users LIMIT 5"
    )
    .fetch(&pool)
    .to_dataframe(5)
    .await
    .expect("Query failed");

    assert!(df.height() <= 5);
    assert_eq!(df.width(), 4);

    let schema = df.schema();
    assert!(schema.get_field("id").is_some());
    assert!(schema.get_field("first_name").is_some());
    assert!(schema.get_field("last_name").is_some());
    assert!(schema.get_field("email").is_some());
}

#[sqlx::test]
async fn test_empty_result(pool: PgPool) {
    let df = sqlx::query_as!(UserRow, "SELECT id FROM users WHERE FALSE")
        .fetch(&pool)
        .to_dataframe_default()
        .await
        .expect("Query failed");

    assert_eq!(df.height(), 0, "Should have 0 rows");
}

#[sqlx::test]
async fn test_parameterized_int(pool: PgPool) {
    let df = sqlx::query_as!(ValRow, "SELECT $1::int as val", 42i32)
        .fetch(&pool)
        .to_dataframe(1)
        .await
        .expect("Query failed");

    assert_eq!(df.height(), 1);

    let val = df.column("val").unwrap();
    assert_eq!(val.get(0).unwrap().to_string(), "42");
}