faucet-sink-postgres
PostgreSQL sink connector for the faucet-stream ecosystem.
Writes JSON records to a PostgreSQL table using either JSONB column mode (storing each record as a single jsonb value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via sqlx and efficient multi-row INSERT statements.
Installation
[]
= "0.1"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "0.2", = ["sink-postgres"] }
Quick Start
use ;
use Sink;
use json;
async
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
connection_url |
String |
(required) | PostgreSQL connection URL (e.g. postgres://user:pass@host:5432/db) |
table_name |
String |
(required) | Target table name |
column_mapping |
PostgresColumnMapping |
Jsonb { column: "data" } |
How to map JSON records to table columns (see below) |
batch_size |
usize |
500 |
Maximum number of rows per INSERT statement |
max_connections |
u32 |
5 |
Maximum number of connections in the connection pool |
The Debug implementation masks the connection_url with *** to prevent credential leakage in logs.
Column Mapping (PostgresColumnMapping)
| Variant | Description |
|---|---|
Jsonb { column } |
Insert each record as a single jsonb column. The column name defaults to "data" but can be overridden. Uses PostgreSQL's unnest($1::jsonb[]) for efficient batch inserts. |
AutoMap |
Map top-level JSON keys directly to table columns. Column names are discovered from information_schema.columns. Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning. |
Builder Methods
use ;
// JSONB mode with custom column name
let config = new
.column_mapping
.batch_size
.max_connections;
// AutoMap mode
let config = new
.column_mapping
.batch_size;
Config Loading
use ;
use PostgresSinkConfig;
// From a JSON file
let config: PostgresSinkConfig = load_json?;
// From an .env file with a prefix
let config: PostgresSinkConfig = load_env_file?;
Example JSON config (JSONB mode)
Example JSON config (AutoMap mode)
Example .env file
PG_SINK_CONNECTION_URL=postgres://writer:s3cret@db.example.com:5432/analytics
PG_SINK_TABLE_NAME=raw_events
PG_SINK_COLUMN_MAPPING='{"jsonb":{"column":"data"}}'
PG_SINK_BATCH_SIZE=500
PG_SINK_MAX_CONNECTIONS=5
Config Schema Introspection
use Sink;
let sink = new.await?;
let schema = sink.config_schema;
println!;
Pipeline Usage
use Pipeline;
use ;
use ;
let source_config = new;
let source = new;
let sink_config = new
.column_mapping;
let sink = new.await?;
let result = new.run.await?;
println!;
Examples
JSONB mode -- store entire records in a single column
This mode is ideal for schemaless ingestion where you want to store raw JSON and query it later with PostgreSQL's JSONB operators.
-- Table schema
(
id SERIAL PRIMARY KEY,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW
);
let config = new
.column_mapping
.batch_size
.max_connections;
let sink = new.await?;
sink.write_batch.await?;
AutoMap mode -- map JSON keys to table columns
This mode automatically discovers column names from the table schema and maps matching JSON keys. Columns use SQL-injection-safe quoted identifiers.
-- Table schema
(
user_id TEXT,
event TEXT,
timestamp TIMESTAMPTZ,
amount NUMERIC
);
let config = new
.column_mapping
.batch_size
.max_connections;
let sink = new.await?;
let records = vec!;
sink.write_batch.await?;
Connection pooling for high-throughput workloads
let config = new
.max_connections
.batch_size;
let sink = new.await?;
How It Works
- A connection pool is created in
PostgresSink::new()usingsqlx::PgPoolwith the configuredmax_connections. write_batch()splits records into chunks ofbatch_sizeand inserts each chunk using a single multi-row INSERT statement.- In JSONB mode, inserts use
INSERT INTO table (col) SELECT * FROM unnest($1::jsonb[])for maximum efficiency. - In AutoMap mode, column names are queried from
information_schema.columns. A multi-rowINSERT INTO ... VALUES ($1, $2), ($3, $4), ...is built dynamically. Column values are bound as JSONB. Missing keys are bound as null. - All identifiers (table names, column names) are quoted using
quote_ident()to prevent SQL injection.
License
Licensed under MIT or Apache-2.0.