faucet-source-mysql
A MySQL query source that executes SQL queries and returns rows as JSON records, with connection pooling via sqlx.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-mysql"] }
Quick Start
use ;
use Source;
async
Configuration
MysqlSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
connection_url |
String |
(required) | MySQL connection URL (e.g. mysql://user:pass@host:3306/db). Masked in debug output for security |
query |
String |
(required) | SQL query to execute |
max_connections |
u32 |
10 |
Maximum number of connections in the pool |
batch_size |
usize |
1000 |
Rows per StreamPage emitted by Source::stream_pages. See Streaming and batching below |
Streaming and batching
MysqlSource::stream_pages drives a sqlx row cursor (Query::fetch)
without buffering the full result. Rows are accumulated into a batch_size
buffer and yielded as a StreamPage once the buffer fills; the trailing
partial page (if any) is yielded after the cursor drains.
batch_size = 0 is the "no batching" sentinel — the cursor is drained
completely and the entire result set is emitted in a single StreamPage.
Use it for small lookup tables, or for downstream sinks (SQL COPY,
BigQuery load jobs, Snowflake stage uploads) that prefer one large request
to many small ones. Values larger than MAX_BATCH_SIZE (1,000,000) are
rejected by faucet_core::validate_batch_size.
The mysql query source has no incremental-replication mode, so every
emitted page carries bookmark: None.
Note — MySQL's wire protocol sends rows from a simple
SELECTin a single response (no server-side cursor). The streaming implementation bounds client-side memory atO(batch_size)and lets the sink begin writing as soon as the first batch is parsed off the wire. True server-side cursor streaming (via stored procedures orSTREAM-style options) is tracked separately as a follow-up.
Supported Column Types
Columns are automatically converted to JSON values:
| MySQL Type | JSON Type |
|---|---|
json |
Native JSON value |
varchar, text, char |
string |
bigint |
number (i64) |
int, mediumint |
number (i32) |
smallint, tinyint |
number (i16) |
double |
number (f64) |
float |
number (f32) |
tinyint(1), boolean |
boolean |
datetime, timestamp, date, time |
string (RFC 3339 / ISO-8601) |
decimal, numeric |
string (exact precision preserved) |
blob, binary, varbinary |
string (base64) |
Other / NULL |
null |
Config Loading
use ;
use MysqlSourceConfig;
let config: MysqlSourceConfig = load_json?;
let config: MysqlSourceConfig = load_env_file?;
Example JSON config
Example .env file
MYSQL_SOURCE_CONNECTION_URL=mysql://user:password@localhost:3306/mydb
MYSQL_SOURCE_QUERY=SELECT * FROM users
MYSQL_SOURCE_MAX_CONNECTIONS=10
Config Schema Introspection
use Source;
let source = new.await?;
let schema = source.config_schema;
println!;
Examples
Simple query
use ;
use Source;
let config = new;
let source = new.await?;
let customers = source.fetch_all.await?;
Custom connection pool size
use ;
let config = new
.with_max_connections;
let source = new.await?;
let records = source.fetch_all.await?;
Using with a Pipeline
use ;
use ;
let config = new;
let source = new.await?;
let pipeline = new;
let result = pipeline.run.await?;
License
Licensed under MIT or Apache-2.0.