faucet-sink-postgres
PostgreSQL sink connector for the faucet-stream ecosystem.
Writes JSON records to a PostgreSQL table using either JSONB column mode (storing each record as a single jsonb value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via sqlx and efficient multi-row INSERT statements.
write_batch accepts whatever slice the pipeline hands it. When batch_size > 0 and the slice is larger than batch_size, the sink re-chunks internally and issues one multi-row INSERT per chunk; when batch_size = 0, the entire slice is sent in a single INSERT — see Streaming and batching for the tradeoffs.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["sink-postgres"] }
Quick Start
use ;
use Sink;
use json;
async
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
connection_url |
String |
(required) | PostgreSQL connection URL (e.g. postgres://user:pass@host:5432/db) |
table_name |
String |
(required) | Target table name |
schema |
Option<String> |
null |
Schema (namespace) qualifying table_name. When set, both AutoMap column discovery and the INSERT target schema.table_name explicitly. When unset, the table resolves against the connection's search_path |
column_mapping |
PostgresColumnMapping |
Jsonb { column: "data" } |
How to map JSON records to table columns (see below) |
batch_size |
usize |
1000 |
Maximum rows per multi-row INSERT. See Streaming and batching below |
max_connections |
u32 |
5 |
Maximum number of connections in the connection pool |
The Debug implementation masks the connection_url with *** to prevent credential leakage in logs.
Streaming and batching
The PostgreSQL sink re-chunks each incoming StreamPage so individual
multi-row INSERT statements stay well under Postgres' per-statement
bind-parameter limit.
batch_size > 0(default1000) — the sink slices the incoming slice intobatch_size-row chunks and issues one multi-rowINSERTper chunk. Recommended value is1000: Postgres' multi-rowINSERTsweet spot. Larger chunks rarely add throughput. AutoMap mode binds one parameter per column per row; the sink now splits each chunk further sorows × columnsnever exceeds Postgres' 65 535 bind-parameter ceiling, so a wide table no longer causes the server to reject the statement regardless ofbatch_size. JSONB mode binds a singlejsonb[]array regardless of row count.batch_size = 0— the "no batching" sentinel. The entire upstreamStreamPageis forwarded in a single logical write. Use this when the source already emits page sizes tuned for Postgres — for example a REST source withbatch_size: 1000feeding a JSONB table. AutoMap still sub-splits internally to respect the 65 535-parameter ceiling.
batch_size is purely a chunk-size knob — connection pooling, identifier
quoting, and JSONB vs AutoMap behaviour are unchanged.
Column Mapping (PostgresColumnMapping)
| Variant | Description |
|---|---|
Jsonb { column } |
Insert each record as a single jsonb column. The column name defaults to "data" but can be overridden. Uses PostgreSQL's unnest($1::jsonb[]) for efficient batch inserts. |
AutoMap |
Map top-level JSON keys directly to table columns. Column names are discovered from the PostgreSQL catalog, scoped (via to_regclass) to exactly the relation the INSERT targets — the configured schema if set, otherwise the search_path-resolved table. Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning. |
Builder Methods
use ;
// JSONB mode with custom column name
let config = new
.column_mapping
.with_batch_size
.max_connections;
// AutoMap mode
let config = new
.column_mapping
.with_batch_size;
Config Loading
use ;
use PostgresSinkConfig;
// From a JSON file
let config: PostgresSinkConfig = load_json?;
// From an .env file with a prefix
let config: PostgresSinkConfig = load_env_file?;
Example JSON config (JSONB mode)
Example JSON config (AutoMap mode)
Example .env file
PG_SINK_CONNECTION_URL=postgres://writer:s3cret@db.example.com:5432/analytics
PG_SINK_TABLE_NAME=raw_events
PG_SINK_COLUMN_MAPPING='{"jsonb":{"column":"data"}}'
PG_SINK_BATCH_SIZE=1000
PG_SINK_MAX_CONNECTIONS=5
Config Schema Introspection
use Sink;
let sink = new.await?;
let schema = sink.config_schema;
println!;
Pipeline Usage
use Pipeline;
use ;
use ;
let source_config = new;
let source = new;
let sink_config = new
.column_mapping;
let sink = new.await?;
let result = new.run.await?;
println!;
Examples
JSONB mode -- store entire records in a single column
This mode is ideal for schemaless ingestion where you want to store raw JSON and query it later with PostgreSQL's JSONB operators.
-- Table schema
(
id SERIAL PRIMARY KEY,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW
);
let config = new
.column_mapping
.with_batch_size
.max_connections;
let sink = new.await?;
sink.write_batch.await?;
AutoMap mode -- map JSON keys to table columns
This mode automatically discovers column names from the table schema and maps matching JSON keys. Columns use SQL-injection-safe quoted identifiers.
-- Table schema
(
user_id TEXT,
event TEXT,
timestamp TIMESTAMPTZ,
amount NUMERIC
);
let config = new
.column_mapping
.with_batch_size
.max_connections;
let sink = new.await?;
let records = vec!;
sink.write_batch.await?;
Connection pooling for high-throughput workloads
let config = new
.max_connections
.with_batch_size;
let sink = new.await?;
How It Works
- A connection pool is created in
PostgresSink::new()usingsqlx::PgPoolwith the configuredmax_connections. write_batch()slices records into chunks ofbatch_size(or forwards the whole slice whenbatch_size = 0) and inserts each chunk using a single multi-row INSERT statement.- In JSONB mode, inserts use
INSERT INTO table (col) SELECT * FROM unnest($1::jsonb[])for maximum efficiency. - In AutoMap mode, each column's name and underlying type (
udt_name) are queried from the PostgreSQL catalog, scoped viato_regclassto exactly the relation theINSERTtargets (the configuredschema, else thesearch_path-resolved table) — so a same-named table in another schema can't pollute the column set. A multi-rowINSERT INTO ... VALUES ($1::int4, $2::timestamptz), ...is built dynamically with a per-column cast, and each value is bound as text so the destination column's input function parses it — numbers, booleans, timestamps, uuids land in their native column types, andjson/jsonbcolumns receive JSON text. (Values are not bound asjsonbregardless of column type — doing so previously made the typed-column example above fail at runtime.) The column set is the union of record keys across the batch (in declared table order), so a field present only in a later record is still written; a row missing a column binds SQLNULL. - All identifiers (table names, column names) are quoted using
quote_ident()to prevent SQL injection.
License
Licensed under MIT or Apache-2.0.