faucet-sink-sqlite
SQLite sink connector for the faucet-stream ecosystem.
Writes JSON records to a SQLite table using either JSON column mode (storing each record as a serialized JSON text value) or AutoMap mode (mapping top-level JSON keys directly to table columns). Uses connection pooling via sqlx, multi-row INSERT statements, and wraps each batch in a transaction (BEGIN/COMMIT) for maximum write throughput. Column discovery uses PRAGMA table_info.
write_batch accepts whatever slice the pipeline hands it. When batch_size > 0 and the slice is larger than batch_size, the sink re-chunks internally and issues one multi-row INSERT (in its own transaction) per chunk; when batch_size = 0, the entire slice is written in a single transaction — see Streaming and batching for the tradeoffs.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["sink-sqlite"] }
Quick Start
use ;
use Sink;
use json;
async
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
database_url |
String |
(required) | SQLite database URL. Can be a file path (e.g. /tmp/app.db), a sqlite: URL, or sqlite::memory: for an in-memory database. |
table_name |
String |
(required) | Target table name |
column_mapping |
SqliteColumnMapping |
Json { column: "data" } |
How to map JSON records to table columns (see below) |
batch_size |
usize |
1000 |
Maximum number of rows per multi-row INSERT. See Streaming and batching below |
max_connections |
u32 |
1 |
Maximum number of connections in the connection pool. SQLite serializes writers at the file level, so one writer is the safe default — a multi-connection pool against a single file races for the write lock and risks SQLITE_BUSY. The pool opens connections in WAL mode with a 5s busy_timeout, so raising this lets extra connections read concurrently with the single writer. |
Streaming and batching
The SQLite sink re-chunks each incoming StreamPage to keep individual
multi-row INSERT statements within SQLite's per-statement parameter limits
and to amortise per-transaction overhead.
batch_size > 0(default1000) — the sink slices the incoming slice intobatch_size-row chunks and issues one multi-row INSERT per chunk, each wrapped in its ownBEGIN/COMMITtransaction. Recommended value is1000: large enough to amortise transaction overhead, small enough to stay well under SQLite's defaultSQLITE_MAX_VARIABLE_NUMBER(32766 since 3.32.0). In AutoMap mode the sink also splits each chunk further sorows × columnsnever exceeds that limit, so a wide table no longer fails with "too many SQL variables" regardless ofbatch_size.batch_size = 0— the "no batching" sentinel. The entire upstreamStreamPageis written in a single multi-row INSERT inside one transaction. Use this when the source already emits page sizes tuned for SQLite — for example a Postgres source withbatch_size: 1000. Pages large enough to push the parameter count past SQLite's per-statement limit will fail at the prepare step.
batch_size is purely a chunk-size knob — transaction wrapping (one
BEGIN/COMMIT per chunk), identifier quoting, and per-record error
reporting are unchanged.
Column Mapping (SqliteColumnMapping)
| Variant | Description |
|---|---|
Json { column } |
Insert each record as a serialized JSON text string in a single column. The column name defaults to "data" but can be overridden. Uses a multi-row INSERT wrapped in a transaction. |
AutoMap |
Map top-level JSON keys directly to table columns. Column names are discovered using PRAGMA table_info(table_name). Only keys that match existing columns are inserted; extra keys are silently ignored. Records with no matching keys are skipped with a warning. |
Builder Methods
use ;
// JSON mode with custom column
let config = new
.column_mapping
.with_batch_size
.max_connections;
// AutoMap mode
let config = new
.column_mapping
.with_batch_size;
Config Loading
use ;
use SqliteSinkConfig;
// From a JSON file
let config: SqliteSinkConfig = load_json?;
// From an .env file with a prefix
let config: SqliteSinkConfig = load_env_file?;
Example JSON config (JSON mode)
Example JSON config (AutoMap mode)
Example .env file
SQLITE_SINK_DATABASE_URL=/data/analytics.db
SQLITE_SINK_TABLE_NAME=raw_events
SQLITE_SINK_COLUMN_MAPPING='{"json":{"column":"data"}}'
SQLITE_SINK_BATCH_SIZE=1000
SQLITE_SINK_MAX_CONNECTIONS=5
Config Schema Introspection
use Sink;
let sink = new.await?;
let schema = sink.config_schema;
println!;
Pipeline Usage
use Pipeline;
use ;
use ;
let source = new;
let sink_config = new
.column_mapping;
let sink = new.await?;
let result = new.run.await?;
println!;
Examples
JSON mode -- store records as serialized JSON text
-- Table schema
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
let config = new
.column_mapping
.with_batch_size;
let sink = new.await?;
sink.write_batch.await?;
AutoMap mode -- map JSON keys to table columns
-- Table schema
(
user_id TEXT,
event TEXT,
amount REAL,
created_at TEXT DEFAULT (datetime('now'))
);
let config = new
.column_mapping
.with_batch_size;
let sink = new.await?;
let records = vec!;
sink.write_batch.await?;
In-memory database for testing
let config = new
.column_mapping;
let sink = new.await?;
How It Works
- A connection pool is created in
SqliteSink::new()usingsqlx::SqlitePoolwith the configuredmax_connections(default1). Each connection is opened in WAL journal mode (journal_mode = WAL) with a 5-secondbusy_timeoutandcreate_if_missing, so a writer and readers can proceed concurrently and lock contention waits-and-retries instead of failing immediately withSQLITE_BUSY. WAL on asqlite::memory:database is a harmless no-op. write_batch()slices the input intobatch_size-row chunks (or forwards the whole slice whenbatch_size = 0). Each chunk is inserted using a single multi-row INSERT statement wrapped in aBEGIN/COMMITtransaction for write performance.- In JSON mode, each record is serialized to a JSON string and inserted as
INSERT INTO t (col) VALUES (?), (?), .... - In AutoMap mode, column names are discovered using
PRAGMA table_info(table_name). A multi-row INSERT is built dynamically. Column values are bound as native SQLite types — strings asTEXT, JSON numbers asINTEGER/REAL, booleans asINTEGER0/1 — so column affinity and typed reads round-trip correctly. Arrays and objects (which have no scalar SQL representation) are bound as their JSON text. The INSERT column set is the union of record keys across the batch (in table order), so a field present only in a later record is still written; a row missing a column binds SQLNULL. - All identifiers (table names, column names) are quoted using
quote_ident()(double-quote escaping) to prevent SQL injection. - Transaction wrapping ensures that either all rows in a batch are committed or none are, providing atomicity per batch.
License
Licensed under MIT or Apache-2.0.