faucet-sink-postgres 0.2.0

PostgreSQL sink connector for the faucet-stream ecosystem
Documentation

faucet-sink-postgres

Crates.io Docs.rs

PostgreSQL sink connector for the faucet-stream ecosystem.

Writes JSON records to a PostgreSQL table using either JSONB column mode (storing each record as a single jsonb value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via sqlx and efficient multi-row INSERT statements.

Installation

[dependencies]
faucet-sink-postgres = "0.1"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "0.2", features = ["sink-postgres"] }

Quick Start

use faucet_sink_postgres::{PostgresSink, PostgresSinkConfig};
use faucet_core::Sink;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = PostgresSinkConfig::new(
        "postgres://user:password@localhost:5432/mydb",
        "events",
    );

    let sink = PostgresSink::new(config).await?;

    let records = vec![
        json!({"user_id": "u123", "event": "signup", "metadata": {"source": "web"}}),
        json!({"user_id": "u456", "event": "login", "metadata": {"source": "mobile"}}),
    ];

    let rows_written = sink.write_batch(&records).await?;
    println!("Wrote {rows_written} rows");

    Ok(())
}

Configuration

Field Type Default Description
connection_url String (required) PostgreSQL connection URL (e.g. postgres://user:pass@host:5432/db)
table_name String (required) Target table name
column_mapping PostgresColumnMapping Jsonb { column: "data" } How to map JSON records to table columns (see below)
batch_size usize 500 Maximum number of rows per INSERT statement
max_connections u32 5 Maximum number of connections in the connection pool

The Debug implementation masks the connection_url with *** to prevent credential leakage in logs.

Column Mapping (PostgresColumnMapping)

Variant Description
Jsonb { column } Insert each record as a single jsonb column. The column name defaults to "data" but can be overridden. Uses PostgreSQL's unnest($1::jsonb[]) for efficient batch inserts.
AutoMap Map top-level JSON keys directly to table columns. Column names are discovered from information_schema.columns. Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning.

Builder Methods

use faucet_sink_postgres::{PostgresSinkConfig, PostgresColumnMapping};

// JSONB mode with custom column name
let config = PostgresSinkConfig::new("postgres://localhost/mydb", "events")
    .column_mapping(PostgresColumnMapping::Jsonb { column: "payload".into() })
    .batch_size(1000)
    .max_connections(10);

// AutoMap mode
let config = PostgresSinkConfig::new("postgres://localhost/mydb", "events")
    .column_mapping(PostgresColumnMapping::AutoMap)
    .batch_size(250);

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_sink_postgres::PostgresSinkConfig;

// From a JSON file
let config: PostgresSinkConfig = load_json("config.json")?;

// From an .env file with a prefix
let config: PostgresSinkConfig = load_env_file(".env", "PG_SINK")?;

Example JSON config (JSONB mode)

{
  "connection_url": "postgres://writer:s3cret@db.example.com:5432/analytics",
  "table_name": "raw_events",
  "column_mapping": {
    "jsonb": {
      "column": "data"
    }
  },
  "batch_size": 500,
  "max_connections": 5
}

Example JSON config (AutoMap mode)

{
  "connection_url": "postgres://writer:s3cret@db.example.com:5432/analytics",
  "table_name": "events",
  "column_mapping": "auto_map",
  "batch_size": 500,
  "max_connections": 10
}

Example .env file

PG_SINK_CONNECTION_URL=postgres://writer:s3cret@db.example.com:5432/analytics
PG_SINK_TABLE_NAME=raw_events
PG_SINK_COLUMN_MAPPING='{"jsonb":{"column":"data"}}'
PG_SINK_BATCH_SIZE=500
PG_SINK_MAX_CONNECTIONS=5

Config Schema Introspection

use faucet_core::Sink;

let sink = PostgresSink::new(config).await?;
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Pipeline Usage

use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_postgres::{PostgresSink, PostgresSinkConfig, PostgresColumnMapping};

let source_config = RestStreamConfig::new("https://api.example.com", "/v1/users");
let source = RestStream::new(source_config);

let sink_config = PostgresSinkConfig::new(
    "postgres://writer:pass@localhost:5432/app",
    "users",
)
.column_mapping(PostgresColumnMapping::AutoMap);

let sink = PostgresSink::new(sink_config).await?;

let result = Pipeline::new(source, sink).run().await?;
println!("Transferred {} records", result.records_written);

Examples

JSONB mode -- store entire records in a single column

This mode is ideal for schemaless ingestion where you want to store raw JSON and query it later with PostgreSQL's JSONB operators.

-- Table schema
CREATE TABLE raw_events (
    id SERIAL PRIMARY KEY,
    data JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
let config = PostgresSinkConfig::new(
    "postgres://localhost/analytics",
    "raw_events",
)
.column_mapping(PostgresColumnMapping::Jsonb { column: "data".into() })
.batch_size(500)
.max_connections(5);

let sink = PostgresSink::new(config).await?;
sink.write_batch(&records).await?;

AutoMap mode -- map JSON keys to table columns

This mode automatically discovers column names from the table schema and maps matching JSON keys. Columns use SQL-injection-safe quoted identifiers.

-- Table schema
CREATE TABLE events (
    user_id TEXT,
    event TEXT,
    timestamp TIMESTAMPTZ,
    amount NUMERIC
);
let config = PostgresSinkConfig::new(
    "postgres://localhost/analytics",
    "events",
)
.column_mapping(PostgresColumnMapping::AutoMap)
.batch_size(1000)
.max_connections(10);

let sink = PostgresSink::new(config).await?;

let records = vec![
    json!({"user_id": "u1", "event": "purchase", "amount": 29.99}),
    json!({"user_id": "u2", "event": "signup"}),  // missing "amount" will be null
];
sink.write_batch(&records).await?;

Connection pooling for high-throughput workloads

let config = PostgresSinkConfig::new(
    "postgres://writer:pass@db-primary.internal:5432/warehouse",
    "metrics",
)
.max_connections(20)
.batch_size(1000);

let sink = PostgresSink::new(config).await?;

How It Works

  • A connection pool is created in PostgresSink::new() using sqlx::PgPool with the configured max_connections.
  • write_batch() splits records into chunks of batch_size and inserts each chunk using a single multi-row INSERT statement.
  • In JSONB mode, inserts use INSERT INTO table (col) SELECT * FROM unnest($1::jsonb[]) for maximum efficiency.
  • In AutoMap mode, column names are queried from information_schema.columns. A multi-row INSERT INTO ... VALUES ($1, $2), ($3, $4), ... is built dynamically. Column values are bound as JSONB. Missing keys are bound as null.
  • All identifiers (table names, column names) are quoted using quote_ident() to prevent SQL injection.

License

Licensed under MIT or Apache-2.0.