faucet-sink-postgres 1.0.0

PostgreSQL sink connector for the faucet-stream ecosystem
Documentation

faucet-sink-postgres

Crates.io Docs.rs

PostgreSQL sink connector for the faucet-stream ecosystem.

Writes JSON records to a PostgreSQL table using either JSONB column mode (storing each record as a single jsonb value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via sqlx and efficient multi-row INSERT statements.

write_batch accepts whatever slice the pipeline hands it. When batch_size > 0 and the slice is larger than batch_size, the sink re-chunks internally and issues one multi-row INSERT per chunk; when batch_size = 0, the entire slice is sent in a single INSERT — see Streaming and batching for the tradeoffs.

Installation

[dependencies]
faucet-sink-postgres = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["sink-postgres"] }

Quick Start

use faucet_sink_postgres::{PostgresSink, PostgresSinkConfig};
use faucet_core::Sink;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = PostgresSinkConfig::new(
        "postgres://user:password@localhost:5432/mydb",
        "events",
    );

    let sink = PostgresSink::new(config).await?;

    let records = vec![
        json!({"user_id": "u123", "event": "signup", "metadata": {"source": "web"}}),
        json!({"user_id": "u456", "event": "login", "metadata": {"source": "mobile"}}),
    ];

    let rows_written = sink.write_batch(&records).await?;
    println!("Wrote {rows_written} rows");

    Ok(())
}

Configuration

Field Type Default Description
connection_url String (required) PostgreSQL connection URL (e.g. postgres://user:pass@host:5432/db)
table_name String (required) Target table name
schema Option<String> null Schema (namespace) qualifying table_name. When set, both AutoMap column discovery and the INSERT target schema.table_name explicitly. When unset, the table resolves against the connection's search_path
column_mapping PostgresColumnMapping Jsonb { column: "data" } How to map JSON records to table columns (see below)
batch_size usize 1000 Maximum rows per multi-row INSERT. See Streaming and batching below
max_connections u32 5 Maximum number of connections in the connection pool

The Debug implementation masks the connection_url with *** to prevent credential leakage in logs.

Streaming and batching

The PostgreSQL sink re-chunks each incoming StreamPage so individual multi-row INSERT statements stay well under Postgres' per-statement bind-parameter limit.

  • batch_size > 0 (default 1000) — the sink slices the incoming slice into batch_size-row chunks and issues one multi-row INSERT per chunk. Recommended value is 1000: Postgres' multi-row INSERT sweet spot. Larger chunks rarely add throughput. AutoMap mode binds one parameter per column per row; the sink now splits each chunk further so rows × columns never exceeds Postgres' 65 535 bind-parameter ceiling, so a wide table no longer causes the server to reject the statement regardless of batch_size. JSONB mode binds a single jsonb[] array regardless of row count.
  • batch_size = 0 — the "no batching" sentinel. The entire upstream StreamPage is forwarded in a single logical write. Use this when the source already emits page sizes tuned for Postgres — for example a REST source with batch_size: 1000 feeding a JSONB table. AutoMap still sub-splits internally to respect the 65 535-parameter ceiling.

batch_size is purely a chunk-size knob — connection pooling, identifier quoting, and JSONB vs AutoMap behaviour are unchanged.

Column Mapping (PostgresColumnMapping)

Variant Description
Jsonb { column } Insert each record as a single jsonb column. The column name defaults to "data" but can be overridden. Uses PostgreSQL's unnest($1::jsonb[]) for efficient batch inserts.
AutoMap Map top-level JSON keys directly to table columns. Column names are discovered from the PostgreSQL catalog, scoped (via to_regclass) to exactly the relation the INSERT targets — the configured schema if set, otherwise the search_path-resolved table. Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning.

Builder Methods

use faucet_sink_postgres::{PostgresSinkConfig, PostgresColumnMapping};

// JSONB mode with custom column name
let config = PostgresSinkConfig::new("postgres://localhost/mydb", "events")
    .column_mapping(PostgresColumnMapping::Jsonb { column: "payload".into() })
    .with_batch_size(1000)
    .max_connections(10);

// AutoMap mode
let config = PostgresSinkConfig::new("postgres://localhost/mydb", "events")
    .column_mapping(PostgresColumnMapping::AutoMap)
    .with_batch_size(250);

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_sink_postgres::PostgresSinkConfig;

// From a JSON file
let config: PostgresSinkConfig = load_json("config.json")?;

// From an .env file with a prefix
let config: PostgresSinkConfig = load_env_file(".env", "PG_SINK")?;

Example JSON config (JSONB mode)

{
  "connection_url": "postgres://writer:s3cret@db.example.com:5432/analytics",
  "table_name": "raw_events",
  "column_mapping": {
    "jsonb": {
      "column": "data"
    }
  },
  "batch_size": 1000,
  "max_connections": 5
}

Example JSON config (AutoMap mode)

{
  "connection_url": "postgres://writer:s3cret@db.example.com:5432/analytics",
  "table_name": "events",
  "column_mapping": "auto_map",
  "batch_size": 1000,
  "max_connections": 10
}

Example .env file

PG_SINK_CONNECTION_URL=postgres://writer:s3cret@db.example.com:5432/analytics
PG_SINK_TABLE_NAME=raw_events
PG_SINK_COLUMN_MAPPING='{"jsonb":{"column":"data"}}'
PG_SINK_BATCH_SIZE=1000
PG_SINK_MAX_CONNECTIONS=5

Config Schema Introspection

use faucet_core::Sink;

let sink = PostgresSink::new(config).await?;
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Pipeline Usage

use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_postgres::{PostgresSink, PostgresSinkConfig, PostgresColumnMapping};

let source_config = RestStreamConfig::new("https://api.example.com", "/v1/users");
let source = RestStream::new(source_config);

let sink_config = PostgresSinkConfig::new(
    "postgres://writer:pass@localhost:5432/app",
    "users",
)
.column_mapping(PostgresColumnMapping::AutoMap);

let sink = PostgresSink::new(sink_config).await?;

let result = Pipeline::new(source, sink).run().await?;
println!("Transferred {} records", result.records_written);

Examples

JSONB mode -- store entire records in a single column

This mode is ideal for schemaless ingestion where you want to store raw JSON and query it later with PostgreSQL's JSONB operators.

-- Table schema
CREATE TABLE raw_events (
    id SERIAL PRIMARY KEY,
    data JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
let config = PostgresSinkConfig::new(
    "postgres://localhost/analytics",
    "raw_events",
)
.column_mapping(PostgresColumnMapping::Jsonb { column: "data".into() })
.with_batch_size(1000)
.max_connections(5);

let sink = PostgresSink::new(config).await?;
sink.write_batch(&records).await?;

AutoMap mode -- map JSON keys to table columns

This mode automatically discovers column names from the table schema and maps matching JSON keys. Columns use SQL-injection-safe quoted identifiers.

-- Table schema
CREATE TABLE events (
    user_id TEXT,
    event TEXT,
    timestamp TIMESTAMPTZ,
    amount NUMERIC
);
let config = PostgresSinkConfig::new(
    "postgres://localhost/analytics",
    "events",
)
.column_mapping(PostgresColumnMapping::AutoMap)
.with_batch_size(1000)
.max_connections(10);

let sink = PostgresSink::new(config).await?;

let records = vec![
    json!({"user_id": "u1", "event": "purchase", "amount": 29.99}),
    json!({"user_id": "u2", "event": "signup"}),  // missing "amount" will be null
];
sink.write_batch(&records).await?;

Connection pooling for high-throughput workloads

let config = PostgresSinkConfig::new(
    "postgres://writer:pass@db-primary.internal:5432/warehouse",
    "metrics",
)
.max_connections(20)
.with_batch_size(1000);

let sink = PostgresSink::new(config).await?;

How It Works

  • A connection pool is created in PostgresSink::new() using sqlx::PgPool with the configured max_connections.
  • write_batch() slices records into chunks of batch_size (or forwards the whole slice when batch_size = 0) and inserts each chunk using a single multi-row INSERT statement.
  • In JSONB mode, inserts use INSERT INTO table (col) SELECT * FROM unnest($1::jsonb[]) for maximum efficiency.
  • In AutoMap mode, each column's name and underlying type (udt_name) are queried from the PostgreSQL catalog, scoped via to_regclass to exactly the relation the INSERT targets (the configured schema, else the search_path-resolved table) — so a same-named table in another schema can't pollute the column set. A multi-row INSERT INTO ... VALUES ($1::int4, $2::timestamptz), ... is built dynamically with a per-column cast, and each value is bound as text so the destination column's input function parses it — numbers, booleans, timestamps, uuids land in their native column types, and json/jsonb columns receive JSON text. (Values are not bound as jsonb regardless of column type — doing so previously made the typed-column example above fail at runtime.) The column set is the union of record keys across the batch (in declared table order), so a field present only in a later record is still written; a row missing a column binds SQL NULL.
  • All identifiers (table names, column names) are quoted using quote_ident() to prevent SQL injection.

License

Licensed under MIT or Apache-2.0.