faucet-source-postgres
A PostgreSQL query source that executes SQL queries and returns rows as JSON records, with connection pooling via sqlx.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-postgres"] }
Quick Start
use ;
use Source;
async
Configuration
PostgresSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
connection_url |
String |
(required) | PostgreSQL connection URL (e.g. postgres://user:pass@host:5432/db). Masked in debug output for security |
query |
String |
(required) | SQL query to execute |
params |
Vec<Value> |
[] |
Bind parameters for the query (positional: $1, $2, etc.) |
max_connections |
u32 |
10 |
Maximum number of connections in the pool |
batch_size |
usize |
1000 |
Rows per StreamPage emitted by Source::stream_pages. See Streaming and batching below |
Streaming and batching
PostgresSource::stream_pages drives a sqlx row cursor (Query::fetch)
without buffering the full result. Rows are accumulated into a batch_size
buffer and yielded as a StreamPage once the buffer fills; the trailing
partial page (if any) is yielded after the cursor drains.
batch_size = 0 is the "no batching" sentinel — the cursor is drained
completely and the entire result set is emitted in a single StreamPage.
Use it for small lookup tables, or for downstream sinks (SQL COPY,
BigQuery load jobs, Snowflake stage uploads) that prefer one large request
to many small ones. Values larger than MAX_BATCH_SIZE (1,000,000) are
rejected by faucet_core::validate_batch_size.
The postgres query source has no incremental-replication mode, so every
emitted page carries bookmark: None.
Note — Postgres' wire protocol sends rows from a simple
SELECTin a single response (no server-side cursor). The streaming implementation bounds client-side memory atO(batch_size)and lets the sink begin writing as soon as the first batch is parsed off the wire. True server-side cursor streaming (DECLARE ... CURSOR FOR ...) is tracked separately as a follow-up.
Supported Column Types
Columns are automatically converted to JSON values:
| PostgreSQL Type | JSON Type |
|---|---|
json, jsonb |
Native JSON value |
text, varchar, char |
string |
int8, bigint |
number (i64) |
int4, integer |
number (i32) |
int2, smallint |
number (i16) |
float8, double precision |
number (f64) |
float4, real |
number (f32) |
bool, boolean |
boolean |
timestamptz |
string (RFC 3339) |
timestamp, date, time |
string (ISO-8601) |
uuid |
string |
numeric, decimal |
string (exact precision preserved) |
bytea |
string (base64) |
Other / NULL |
null |
Config Loading
use ;
use PostgresSourceConfig;
let config: PostgresSourceConfig = load_json?;
let config: PostgresSourceConfig = load_env_file?;
Example JSON config
Example .env file
PG_SOURCE_CONNECTION_URL=postgres://user:password@localhost:5432/mydb
PG_SOURCE_QUERY=SELECT * FROM users
PG_SOURCE_MAX_CONNECTIONS=10
Config Schema Introspection
use Source;
let source = new.await?;
let schema = source.config_schema;
println!;
Examples
Simple query
use ;
use Source;
let config = new;
let source = new.await?;
let customers = source.fetch_all.await?;
Parameterized query
use ;
use Source;
use json;
let config = new
.params;
let source = new.await?;
let orders = source.fetch_all.await?;
Custom connection pool size
use ;
let config = new
.with_max_connections;
let source = new.await?;
let records = source.fetch_all.await?;
License
Licensed under MIT or Apache-2.0.