# faucet-sink-postgres
[](https://crates.io/crates/faucet-sink-postgres)
[](https://docs.rs/faucet-sink-postgres)
PostgreSQL sink connector for the [faucet-stream](https://github.com/PawanSikawat/faucet-stream) ecosystem.
Writes JSON records to a PostgreSQL table using either JSONB column mode (storing each record as a single `jsonb` value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via `sqlx` and efficient multi-row `INSERT` statements.
## Installation
```toml
[dependencies]
faucet-sink-postgres = "0.1"
tokio = { version = "1", features = ["full"] }
```
Or via the umbrella crate:
```toml
faucet-stream = { version = "0.2", features = ["sink-postgres"] }
```
## Quick Start
```rust
use faucet_sink_postgres::{PostgresSink, PostgresSinkConfig};
use faucet_core::Sink;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = PostgresSinkConfig::new(
"postgres://user:password@localhost:5432/mydb",
"events",
);
let sink = PostgresSink::new(config).await?;
let records = vec![
json!({"user_id": "u123", "event": "signup", "metadata": {"source": "web"}}),
json!({"user_id": "u456", "event": "login", "metadata": {"source": "mobile"}}),
];
let rows_written = sink.write_batch(&records).await?;
println!("Wrote {rows_written} rows");
Ok(())
}
```
## Configuration
| `connection_url` | `String` | *(required)* | PostgreSQL connection URL (e.g. `postgres://user:pass@host:5432/db`) |
| `table_name` | `String` | *(required)* | Target table name |
| `column_mapping` | `PostgresColumnMapping` | `Jsonb { column: "data" }` | How to map JSON records to table columns (see below) |
| `batch_size` | `usize` | `500` | Maximum number of rows per INSERT statement |
| `max_connections` | `u32` | `5` | Maximum number of connections in the connection pool |
The `Debug` implementation masks the `connection_url` with `***` to prevent credential leakage in logs.
### Column Mapping (`PostgresColumnMapping`)
| `Jsonb { column }` | Insert each record as a single `jsonb` column. The column name defaults to `"data"` but can be overridden. Uses PostgreSQL's `unnest($1::jsonb[])` for efficient batch inserts. |
| `AutoMap` | Map top-level JSON keys directly to table columns. Column names are discovered from `information_schema.columns`. Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning. |
### Builder Methods
```rust
use faucet_sink_postgres::{PostgresSinkConfig, PostgresColumnMapping};
// JSONB mode with custom column name
let config = PostgresSinkConfig::new("postgres://localhost/mydb", "events")
.column_mapping(PostgresColumnMapping::Jsonb { column: "payload".into() })
.batch_size(1000)
.max_connections(10);
// AutoMap mode
let config = PostgresSinkConfig::new("postgres://localhost/mydb", "events")
.column_mapping(PostgresColumnMapping::AutoMap)
.batch_size(250);
```
## Config Loading
```rust
use faucet_core::config::{load_json, load_env_file};
use faucet_sink_postgres::PostgresSinkConfig;
// From a JSON file
let config: PostgresSinkConfig = load_json("config.json")?;
// From an .env file with a prefix
let config: PostgresSinkConfig = load_env_file(".env", "PG_SINK")?;
```
### Example JSON config (JSONB mode)
```json
{
"connection_url": "postgres://writer:s3cret@db.example.com:5432/analytics",
"table_name": "raw_events",
"column_mapping": {
"jsonb": {
"column": "data"
}
},
"batch_size": 500,
"max_connections": 5
}
```
### Example JSON config (AutoMap mode)
```json
{
"connection_url": "postgres://writer:s3cret@db.example.com:5432/analytics",
"table_name": "events",
"column_mapping": "auto_map",
"batch_size": 500,
"max_connections": 10
}
```
### Example .env file
```env
PG_SINK_CONNECTION_URL=postgres://writer:s3cret@db.example.com:5432/analytics
PG_SINK_TABLE_NAME=raw_events
PG_SINK_COLUMN_MAPPING='{"jsonb":{"column":"data"}}'
PG_SINK_BATCH_SIZE=500
PG_SINK_MAX_CONNECTIONS=5
```
## Config Schema Introspection
```rust
use faucet_core::Sink;
let sink = PostgresSink::new(config).await?;
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);
```
## Pipeline Usage
```rust
use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_postgres::{PostgresSink, PostgresSinkConfig, PostgresColumnMapping};
let source_config = RestStreamConfig::new("https://api.example.com", "/v1/users");
let source = RestStream::new(source_config);
let sink_config = PostgresSinkConfig::new(
"postgres://writer:pass@localhost:5432/app",
"users",
)
.column_mapping(PostgresColumnMapping::AutoMap);
let sink = PostgresSink::new(sink_config).await?;
let result = Pipeline::new(source, sink).run().await?;
println!("Transferred {} records", result.records_written);
```
## Examples
### JSONB mode -- store entire records in a single column
This mode is ideal for schemaless ingestion where you want to store raw JSON and query it later with PostgreSQL's JSONB operators.
```sql
-- Table schema
CREATE TABLE raw_events (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
```rust
let config = PostgresSinkConfig::new(
"postgres://localhost/analytics",
"raw_events",
)
.column_mapping(PostgresColumnMapping::Jsonb { column: "data".into() })
.batch_size(500)
.max_connections(5);
let sink = PostgresSink::new(config).await?;
sink.write_batch(&records).await?;
```
### AutoMap mode -- map JSON keys to table columns
This mode automatically discovers column names from the table schema and maps matching JSON keys. Columns use SQL-injection-safe quoted identifiers.
```sql
-- Table schema
CREATE TABLE events (
user_id TEXT,
event TEXT,
timestamp TIMESTAMPTZ,
amount NUMERIC
);
```
```rust
let config = PostgresSinkConfig::new(
"postgres://localhost/analytics",
"events",
)
.column_mapping(PostgresColumnMapping::AutoMap)
.batch_size(1000)
.max_connections(10);
let sink = PostgresSink::new(config).await?;
let records = vec![
json!({"user_id": "u1", "event": "purchase", "amount": 29.99}),
json!({"user_id": "u2", "event": "signup"}), // missing "amount" will be null
];
sink.write_batch(&records).await?;
```
### Connection pooling for high-throughput workloads
```rust
let config = PostgresSinkConfig::new(
"postgres://writer:pass@db-primary.internal:5432/warehouse",
"metrics",
)
.max_connections(20)
.batch_size(1000);
let sink = PostgresSink::new(config).await?;
```
## How It Works
- A connection pool is created in `PostgresSink::new()` using `sqlx::PgPool` with the configured `max_connections`.
- `write_batch()` splits records into chunks of `batch_size` and inserts each chunk using a single multi-row INSERT statement.
- In JSONB mode, inserts use `INSERT INTO table (col) SELECT * FROM unnest($1::jsonb[])` for maximum efficiency.
- In AutoMap mode, column names are queried from `information_schema.columns`. A multi-row `INSERT INTO ... VALUES ($1, $2), ($3, $4), ...` is built dynamically. Column values are bound as JSONB. Missing keys are bound as null.
- All identifiers (table names, column names) are quoted using `quote_ident()` to prevent SQL injection.
## License
Licensed under MIT or Apache-2.0.