faucet-source-postgres 1.0.0

PostgreSQL query source connector for the faucet-stream ecosystem
Documentation

faucet-source-postgres

Crates.io Docs.rs

A PostgreSQL query source that executes SQL queries and returns rows as JSON records, with connection pooling via sqlx.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-postgres = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-postgres"] }

Quick Start

use faucet_source_postgres::{PostgresSource, PostgresSourceConfig};
use faucet_core::Source;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = PostgresSourceConfig::new(
        "postgres://user:password@localhost:5432/mydb",
        "SELECT id, name, email FROM users WHERE active = true",
    );

    let source = PostgresSource::new(config).await?;
    let records = source.fetch_all().await?;

    for record in &records {
        println!("{}", record);
    }
    Ok(())
}

Configuration

PostgresSourceConfig

Field Type Default Description
connection_url String (required) PostgreSQL connection URL (e.g. postgres://user:pass@host:5432/db). Masked in debug output for security
query String (required) SQL query to execute
params Vec<Value> [] Bind parameters for the query (positional: $1, $2, etc.)
max_connections u32 10 Maximum number of connections in the pool
batch_size usize 1000 Rows per StreamPage emitted by Source::stream_pages. See Streaming and batching below

Streaming and batching

PostgresSource::stream_pages drives a sqlx row cursor (Query::fetch) without buffering the full result. Rows are accumulated into a batch_size buffer and yielded as a StreamPage once the buffer fills; the trailing partial page (if any) is yielded after the cursor drains.

batch_size = 0 is the "no batching" sentinel — the cursor is drained completely and the entire result set is emitted in a single StreamPage. Use it for small lookup tables, or for downstream sinks (SQL COPY, BigQuery load jobs, Snowflake stage uploads) that prefer one large request to many small ones. Values larger than MAX_BATCH_SIZE (1,000,000) are rejected by faucet_core::validate_batch_size.

The postgres query source has no incremental-replication mode, so every emitted page carries bookmark: None.

Note — Postgres' wire protocol sends rows from a simple SELECT in a single response (no server-side cursor). The streaming implementation bounds client-side memory at O(batch_size) and lets the sink begin writing as soon as the first batch is parsed off the wire. True server-side cursor streaming (DECLARE ... CURSOR FOR ...) is tracked separately as a follow-up.

Supported Column Types

Columns are automatically converted to JSON values:

PostgreSQL Type JSON Type
json, jsonb Native JSON value
text, varchar, char string
int8, bigint number (i64)
int4, integer number (i32)
int2, smallint number (i16)
float8, double precision number (f64)
float4, real number (f32)
bool, boolean boolean
timestamptz string (RFC 3339)
timestamp, date, time string (ISO-8601)
uuid string
numeric, decimal string (exact precision preserved)
bytea string (base64)
Other / NULL null

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_postgres::PostgresSourceConfig;

let config: PostgresSourceConfig = load_json("config.json")?;
let config: PostgresSourceConfig = load_env_file(".env", "PG_SOURCE")?;

Example JSON config

{
  "connection_url": "postgres://analytics:password@db.example.com:5432/warehouse",
  "query": "SELECT id, name, created_at, metadata FROM events WHERE created_at > $1 ORDER BY created_at",
  "params": ["2025-01-01T00:00:00Z"],
  "max_connections": 5,
  "batch_size": 5000
}

Example .env file

PG_SOURCE_CONNECTION_URL=postgres://user:password@localhost:5432/mydb
PG_SOURCE_QUERY=SELECT * FROM users
PG_SOURCE_MAX_CONNECTIONS=10

Config Schema Introspection

use faucet_core::Source;

let source = PostgresSource::new(config).await?;
let schema = source.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Examples

Simple query

use faucet_source_postgres::{PostgresSource, PostgresSourceConfig};
use faucet_core::Source;

let config = PostgresSourceConfig::new(
    "postgres://localhost/mydb",
    "SELECT id, name, email FROM customers ORDER BY id",
);
let source = PostgresSource::new(config).await?;
let customers = source.fetch_all().await?;

Parameterized query

use faucet_source_postgres::{PostgresSource, PostgresSourceConfig};
use faucet_core::Source;
use serde_json::json;

let config = PostgresSourceConfig::new(
    "postgres://localhost/mydb",
    "SELECT * FROM orders WHERE status = $1 AND total > $2",
)
.params(vec![json!("shipped"), json!(100.0)]);

let source = PostgresSource::new(config).await?;
let orders = source.fetch_all().await?;

Custom connection pool size

use faucet_source_postgres::{PostgresSource, PostgresSourceConfig};

let config = PostgresSourceConfig::new(
    "postgres://localhost/mydb",
    "SELECT * FROM large_table",
)
.with_max_connections(20);

let source = PostgresSource::new(config).await?;
let records = source.fetch_all().await?;

License

Licensed under MIT or Apache-2.0.