faucet-sink-sqlite 1.0.1

SQLite sink connector for the faucet-stream ecosystem
Documentation

faucet-sink-sqlite

Crates.io Docs.rs

SQLite sink connector for the faucet-stream ecosystem.

Writes JSON records to a SQLite table using either JSON column mode (storing each record as a serialized JSON text value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via sqlx, multi-row INSERT statements, and wraps each batch in a transaction (BEGIN/COMMIT) for maximum write throughput. Column discovery uses PRAGMA table_info.

write_batch accepts whatever slice the pipeline hands it. When batch_size > 0 and the slice is larger than batch_size, the sink re-chunks internally and issues one multi-row INSERT (in its own transaction) per chunk; when batch_size = 0, the entire slice is written in a single transaction — see Streaming and batching for the tradeoffs.

Installation

[dependencies]
faucet-sink-sqlite = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["sink-sqlite"] }

Quick Start

use faucet_sink_sqlite::{SqliteSink, SqliteSinkConfig};
use faucet_core::Sink;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = SqliteSinkConfig::new("sqlite:///tmp/app.db", "events");
    let sink = SqliteSink::new(config).await?;

    let records = vec![
        json!({"user_id": "u123", "event": "signup"}),
        json!({"user_id": "u456", "event": "login"}),
    ];

    let rows_written = sink.write_batch(&records).await?;
    println!("Wrote {rows_written} rows");

    Ok(())
}

Configuration

Field Type Default Description
database_url String (required) SQLite database URL. Can be a file path (e.g. /tmp/app.db), a sqlite: URL, or sqlite::memory: for an in-memory database.
table_name String (required) Target table name
column_mapping SqliteColumnMapping Json { column: "data" } How to map JSON records to table columns (see below)
batch_size usize 1000 Maximum number of rows per multi-row INSERT. See Streaming and batching below
max_connections u32 1 Maximum number of connections in the connection pool. SQLite serializes writers at the file level, so one writer is the safe default — a multi-connection pool against a single file races for the write lock and risks SQLITE_BUSY. The pool opens connections in WAL mode with a 5s busy_timeout, so raising this lets extra connections read concurrently with the single writer.

Streaming and batching

The SQLite sink re-chunks each incoming StreamPage to keep individual multi-row INSERT statements within SQLite's per-statement parameter limits and to amortise per-transaction overhead.

  • batch_size > 0 (default 1000) — the sink slices the incoming slice into batch_size-row chunks and issues one multi-row INSERT per chunk, each wrapped in its own BEGIN/COMMIT transaction. Recommended value is 1000: large enough to amortise transaction overhead, small enough to stay well under SQLite's default SQLITE_MAX_VARIABLE_NUMBER (32766 since 3.32.0). In AutoMap mode the sink also splits each chunk further so rows × columns never exceeds that limit, so a wide table no longer fails with "too many SQL variables" regardless of batch_size.
  • batch_size = 0 — the "no batching" sentinel. The entire upstream StreamPage is written in a single multi-row INSERT inside one transaction. Use this when the source already emits page sizes tuned for SQLite — for example a Postgres source with batch_size: 1000. Pages large enough to push the parameter count past SQLite's per-statement limit will fail at the prepare step.

batch_size is purely a chunk-size knob — transaction wrapping (one BEGIN/COMMIT per chunk), identifier quoting, and per-record error reporting are unchanged.

Column Mapping (SqliteColumnMapping)

Variant Description
Json { column } Insert each record as a serialized JSON text string in a single column. The column name defaults to "data" but can be overridden. Uses a multi-row INSERT wrapped in a transaction.
AutoMap Map top-level JSON keys directly to table columns. Column names are discovered using PRAGMA table_info(table_name). Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning.

Builder Methods

use faucet_sink_sqlite::{SqliteSinkConfig, SqliteColumnMapping};

// JSON mode with custom column
let config = SqliteSinkConfig::new("sqlite:///data/app.db", "events")
    .column_mapping(SqliteColumnMapping::Json { column: "payload".into() })
    .with_batch_size(1000)
    .max_connections(3);

// AutoMap mode
let config = SqliteSinkConfig::new("sqlite:///data/app.db", "events")
    .column_mapping(SqliteColumnMapping::AutoMap)
    .with_batch_size(250);

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_sink_sqlite::SqliteSinkConfig;

// From a JSON file
let config: SqliteSinkConfig = load_json("config.json")?;

// From an .env file with a prefix
let config: SqliteSinkConfig = load_env_file(".env", "SQLITE_SINK")?;

Example JSON config (JSON mode)

{
  "database_url": "/data/analytics.db",
  "table_name": "raw_events",
  "column_mapping": {
    "json": {
      "column": "data"
    }
  },
  "batch_size": 1000,
  "max_connections": 5
}

Example JSON config (AutoMap mode)

{
  "database_url": "/data/analytics.db",
  "table_name": "events",
  "column_mapping": "auto_map",
  "batch_size": 1000,
  "max_connections": 3
}

Example .env file

SQLITE_SINK_DATABASE_URL=/data/analytics.db
SQLITE_SINK_TABLE_NAME=raw_events
SQLITE_SINK_COLUMN_MAPPING='{"json":{"column":"data"}}'
SQLITE_SINK_BATCH_SIZE=1000
SQLITE_SINK_MAX_CONNECTIONS=5

Config Schema Introspection

use faucet_core::Sink;

let sink = SqliteSink::new(config).await?;
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Pipeline Usage

use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_sqlite::{SqliteSink, SqliteSinkConfig, SqliteColumnMapping};

let source = RestStream::new(
    RestStreamConfig::new("https://api.example.com", "/v1/events")
);

let sink_config = SqliteSinkConfig::new("/data/events.db", "events")
    .column_mapping(SqliteColumnMapping::AutoMap);

let sink = SqliteSink::new(sink_config).await?;

let result = Pipeline::new(source, sink).run().await?;
println!("Transferred {} records", result.records_written);

Examples

JSON mode -- store records as serialized JSON text

-- Table schema
CREATE TABLE raw_events (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    data TEXT NOT NULL,
    created_at TEXT DEFAULT (datetime('now'))
);
let config = SqliteSinkConfig::new("/tmp/app.db", "raw_events")
    .column_mapping(SqliteColumnMapping::Json { column: "data".into() })
    .with_batch_size(1000);

let sink = SqliteSink::new(config).await?;
sink.write_batch(&records).await?;

AutoMap mode -- map JSON keys to table columns

-- Table schema
CREATE TABLE events (
    user_id TEXT,
    event TEXT,
    amount REAL,
    created_at TEXT DEFAULT (datetime('now'))
);
let config = SqliteSinkConfig::new("/tmp/app.db", "events")
    .column_mapping(SqliteColumnMapping::AutoMap)
    .with_batch_size(1000);

let sink = SqliteSink::new(config).await?;

let records = vec![
    json!({"user_id": "u1", "event": "purchase", "amount": 29.99}),
    json!({"user_id": "u2", "event": "signup"}),  // missing "amount" will be null
];
sink.write_batch(&records).await?;

In-memory database for testing

let config = SqliteSinkConfig::new("sqlite::memory:", "test_table")
    .column_mapping(SqliteColumnMapping::AutoMap);

let sink = SqliteSink::new(config).await?;

How It Works

  • A connection pool is created in SqliteSink::new() using sqlx::SqlitePool with the configured max_connections (default 1). Each connection is opened in WAL journal mode (journal_mode = WAL) with a 5-second busy_timeout and create_if_missing, so a writer and readers can proceed concurrently and lock contention waits-and-retries instead of failing immediately with SQLITE_BUSY. WAL on a sqlite::memory: database is a harmless no-op.
  • write_batch() slices the input into batch_size-row chunks (or forwards the whole slice when batch_size = 0). Each chunk is inserted using a single multi-row INSERT statement wrapped in a BEGIN/COMMIT transaction for write performance.
  • In JSON mode, each record is serialized to a JSON string and inserted as INSERT INTO t (col) VALUES (?), (?), ....
  • In AutoMap mode, column names are discovered using PRAGMA table_info(table_name). A multi-row INSERT is built dynamically. Column values are bound as native SQLite types — strings as TEXT, JSON numbers as INTEGER/REAL, booleans as INTEGER 0/1 — so column affinity and typed reads round-trip correctly. Arrays and objects (which have no scalar SQL representation) are bound as their JSON text. The INSERT column set is the union of record keys across the batch (in table order), so a field present only in a later record is still written; a row missing a column binds SQL NULL.
  • All identifiers (table names, column names) are quoted using quote_ident() (double-quote escaping) to prevent SQL injection.
  • Transaction wrapping ensures that either all rows in a batch are committed or none are, providing atomicity per batch.

License

Licensed under MIT or Apache-2.0.