faucet-source-sqlite
A SQLite query source that executes SQL queries and returns rows as JSON records, with connection pooling and dynamic type probing via sqlx.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-sqlite"] }
Quick Start
use ;
use Source;
async
Configuration
SqliteSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
database_url |
String |
(required) | SQLite database URL. Can be a file path (e.g. "sqlite:data.db", "sqlite:/path/to/db") or in-memory ("sqlite::memory:") |
query |
String |
(required) | SQL query to execute |
max_connections |
u32 |
10 |
Maximum number of connections in the pool |
batch_size |
usize |
1000 |
Rows per StreamPage emitted by Source::stream_pages. See Streaming and batching below |
Streaming and batching
SqliteSource::stream_pages drives a sqlx row cursor (Query::fetch)
without buffering the full result. Rows are accumulated into a batch_size
buffer and yielded as a StreamPage once the buffer fills; the trailing
partial page (if any) is yielded after the cursor drains.
batch_size = 0 is the "no batching" sentinel — the cursor is drained
completely and the entire result set is emitted in a single StreamPage.
Use it for small lookup tables, or for downstream sinks (SQL COPY,
BigQuery load jobs, Snowflake stage uploads) that prefer one large request
to many small ones. Values larger than MAX_BATCH_SIZE (1,000,000) are
rejected by faucet_core::validate_batch_size.
The sqlite query source has no incremental-replication mode, so every
emitted page carries bookmark: None.
Note — SQLite is an in-process, file-based engine: there is no server-side cursor concept and no network wire to worry about. The streaming implementation bounds client-side memory at
O(batch_size)and lets the sink begin writing as soon as the first batch is parsed off disk, rather than waiting for the whole result set to materialise in aVec.
Supported Column Types
SQLite has dynamic typing -- values are stored as INTEGER, REAL, TEXT, BLOB, or NULL. The source probes each column value in order of specificity:
| SQLite Storage Class | JSON Type |
|---|---|
| TEXT (valid JSON) | Native JSON value |
| TEXT | string |
| INTEGER (i64) | number |
| INTEGER (i32) | number |
| REAL (f64) | number |
| BOOLEAN | boolean |
| BLOB | string (base64) |
| NULL / unsupported | null |
Config Loading
use ;
use SqliteSourceConfig;
let config: SqliteSourceConfig = load_json?;
let config: SqliteSourceConfig = load_env_file?;
Example JSON config
Example .env file
SQLITE_SOURCE_DATABASE_URL=sqlite:data.db
SQLITE_SOURCE_QUERY=SELECT * FROM events
SQLITE_SOURCE_MAX_CONNECTIONS=10
SQLITE_SOURCE_BATCH_SIZE=1000
Config Schema Introspection
use Source;
let source = new.await?;
let schema = source.config_schema;
println!;
Examples
File-based database
use ;
use Source;
let config = new;
let source = new.await?;
let records = source.fetch_all.await?;
In-memory database
use ;
use Source;
let config = new;
let source = new.await?;
let records = source.fetch_all.await?;
assert_eq!;
assert_eq!;
Custom pool size for concurrent access
use ;
let config = new
.with_max_connections;
let source = new.await?;
let records = source.fetch_all.await?;
License
Licensed under MIT or Apache-2.0.