faucet-core
Shared types, traits, and utilities for the faucet-stream ecosystem.
This is the foundation crate that all faucet source and sink connectors depend on. If you're building a custom connector, this is the only dependency you need.
Installation
[]
= "1.0"
= { = "1", = ["rt"] }
What's Inside
Traits
Source— async trait for fetching records from external systemsSink— async trait for writing records to external systems
Both traits include a config_schema() method that returns a JSON Schema describing the connector's configuration.
Decorators
TransformingSource— wraps anySourcewith a fixedVec<TransformStage>(covering 1→1Map(RecordTransform), 1→0|1Filter, 1→0..NExplode, and arbitraryCustomclosures) applied per page viainstrumented_apply_stages. The canonical way library callers attach transforms — including filter and explode — to any source. Seedocs.rs.
Source::stream_pages (recommended for large sources)
stream_pages(ctx, batch_size) returns a Stream<Item = Result<StreamPage, FaucetError>> where each StreamPage contains a chunk of records plus an optional bookmark. The default implementation wraps fetch_with_context_incremental and chunks the result in memory; sources that can stream natively (REST, CDC, query DBs with cursor pagination) override this method directly to bound source-side memory at O(batch_size). Pipeline::run drives this stream internally; library users do not normally call it themselves.
DEFAULT_BATCH_SIZE is 1000, MAX_BATCH_SIZE is 1_000_000, and validate_batch_size(n) enforces the range with FaucetError::Config errors for connector authors to use at config-load time. batch_size = 0 is the "no batching" sentinel — sources emit the entire result set in a single StreamPage (and sinks that expose their own batch_size accept whatever upstream hands them without re-chunking). Use it for small lookup tables or for bulk-load-style sinks (SQL COPY, BigQuery load jobs) that prefer one large request to many small ones.
use ;
Pipeline
Connect any source to any sink:
use ;
// Batch mode: fetch all, then write
let result = new.run.await?;
println!;
// Streaming mode: write page-by-page (bounded memory)
let result = run_stream.await?;
Error Types
FaucetError covers all failure modes:
| Variant | Use Case |
|---|---|
Http(reqwest::Error) |
HTTP transport errors |
HttpStatus { status, url, body } |
Non-success HTTP responses |
Json(serde_json::Error) |
JSON parse/serialize errors |
JsonPath(String) |
JSONPath extraction failures |
Auth(String) |
Authentication errors |
RateLimited { retry_after } |
429 rate limit responses |
Url(String) |
URL construction errors |
Transform(String) |
Record transform errors |
Config(String) |
Configuration/validation errors |
Source(String) |
Source-specific errors |
Sink(String) |
Sink-specific errors |
Custom(Box<dyn Error>) |
Wrap any third-party error |
Config Loading
Load any Deserialize-able config struct from JSON files or environment variables:
use ;
// From a JSON file
let config: MyConfig = load_json?;
// From environment variables (reads MYAPP_URL, MYAPP_BATCH_SIZE, etc.)
let config: MyConfig = load_env?;
// From a .env file + environment variables
let config: MyConfig = load_env_file?;
Duration Serde Helpers
For Duration fields in configs, use the provided serde modules:
use Duration;
use ;
Record Transforms
Transform records as they flow through the pipeline:
use RecordTransform;
// Flatten nested objects: {"user": {"id": 1}} -> {"user__id": 1}
Flatten
// Convert keys to snake_case (or camel / pascal / kebab / screaming_snake)
KeysCase
// Regex key renaming
RenameKeys
// Custom closure
custom
Replication
Incremental replication support:
use ReplicationMethod;
use ;
// Filter records newer than a bookmark
let filtered = filter_incremental;
// Compute new bookmark from records
let new_bookmark = max_replication_value;
Schema Inference
Infer JSON Schema from record samples:
use infer_schema;
let schema = infer_schema;
// Returns a JSON Schema with inferred types, nullable fields, nested objects
JSON Schema Generation
All config structs derive schemars::JsonSchema. Use schema_for! to generate schemas:
use ;
use ;
let schema = schema_for!;
let json = to_value?;
Re-exports
faucet-core re-exports common dependencies so connector authors only need one dependency:
| Re-export | From |
|---|---|
async_trait |
async-trait |
serde_json, Value, json! |
serde_json |
schemars, JsonSchema, schema_for! |
schemars |
Quality checks
Add a quality: block (sibling of transforms: and dlq: under pipeline:) to
assert invariants on every page of records before they reach the sink. The quality
pass runs after transforms and before write_batch; per-record checks
partition the page into survivors and quarantined rows, then per-batch checks run
over the survivors. Quarantined rows are routed to the DLQ sink.
Enable with the quality Cargo feature (base) or quality-jsonschema to add the
json_schema record check.
Check catalog
Per-record checks — each record is evaluated in declared order; first failure
wins. on_failure may be quarantine (route the row to the DLQ) or abort (fail
the run immediately with FaucetError::QualityFailure).
| Check | Key config fields | Passes when | Missing field |
|---|---|---|---|
not_null |
field, treat_missing_as_null (default true) |
value present and non-null | fail (pass iff treat_missing_as_null: false) |
not_empty |
field |
value is a non-empty string after trim() |
fail |
regex_match |
field, pattern |
value is a string matching pattern |
fail |
value_in_set |
field, values: [...] |
value is in values (exact JSON equality) |
fail |
not_in_set |
field, values: [...] |
value is NOT in values |
pass (trivially not in set) |
compare |
field, op (gt/gte/lt/lte/eq/ne), value |
ordering or equality holds | fail |
type_is |
field, expected (boolean/number/string/array/object/null) |
JSON type matches | fail |
string_length |
field, min?, max? (at least one required) |
char count in [min, max] |
fail |
json_schema (quality-jsonschema feature) |
schema (JSON Schema doc) |
record validates against schema |
(whole-record check; always evaluated) |
json_schema is the most expressive check; its cost scales with schema
complexity — for very large or deeply nested schemas on hot paths, prefer the
granular checks above and benchmark your case.
Per-batch checks — evaluated per page over the survivors (records that passed
the per-record pass). on_failure for aggregate checks (row_count, null_rate,
distinct_count) may be abort or quarantine_batch (route all current survivors
to the DLQ, write nothing this page). unique is row-attributable and accepts
quarantine (route the duplicate rows) or abort.
| Check | Key config fields | Passes when |
|---|---|---|
row_count |
min?, max? (at least one required) |
survivor count in [min, max] |
null_rate |
field, max: f64 (0.0–1.0) |
null-or-missing rate ≤ max; zero survivors → 0.0 → pass |
unique |
fields: [...] (composite key, ≥1) |
every survivor's key is unique within the page |
distinct_count |
field, min?, max? |
distinct values of field in [min, max] |
on_failure policies
| Policy | Meaning | Allowed on |
|---|---|---|
quarantine |
Route the specific offending row(s) to the DLQ; keep the rest | per-record checks; unique |
quarantine_batch |
Route all survivors of the page to the DLQ; write nothing this page | aggregate batch checks |
abort |
Surface FaucetError::QualityFailure and fail the run |
every check |
quarantine and quarantine_batch require a DLQ sink. Configuring either
without a dlq: block is rejected at config-load time with FaucetError::Config.
Example
pipeline:
source:
type: rest
config:
base_url: https://api.example.com/v1
path: /users
method: GET
auth:
pagination:
max_retries: 3
retry_backoff: 2
tolerated_http_errors:
replication_method:
replication_key: updated_at
primary_keys:
partitions:
schema_sample_size: 100
transforms:
- type: keys_case
config:
quality:
record:
- type: not_null
field: id
on_failure: abort
- type: not_null
field: email
on_failure: quarantine
- type: regex_match
field: email
pattern: '^[^@\s]+@[^@\s]+\.[^@\s]+$'
on_failure: quarantine
- type: value_in_set
field: status
values:
on_failure: quarantine
batch:
- type: row_count
min: 1
on_failure: abort
- type: unique
fields:
on_failure: quarantine
dlq:
sink:
type: jsonl
config:
max_failures_per_page: 50
max_failures_total: 500
sink:
type: postgres
config:
connection_url: "${env:PG_URL}"
table_name: users
column_mapping:
batch_size: 500
Rust API
use ;
let quality_spec: QualitySpec = from_value?;
let compiled = compile?;
let result = new
.with_dlq // required when any check uses quarantine
.with_quality
.run
.await?;
Modules
| Module | Contents |
|---|---|
traits |
Source and Sink async traits |
error |
FaucetError enum |
pipeline |
Pipeline, PipelineResult, run_stream |
config |
load_json, load_env, load_env_file, duration serde helpers |
transform |
RecordTransform, CompiledTransform, plus support enums (CastType, CastOnError, ValueCaseMode) |
replication |
ReplicationMethod, filter_incremental, max_replication_value |
schema |
infer_schema |
stage |
TransformStage, FilterSpec, ExplodeSpec, OnMissing. The pipeline-level stage type that wraps RecordTransform (1→1) and adds filter (1→0|1) and explode (1→0..N). See docs/book/src/cookbook/transforms.md for the merge rule and JSONPath subset. |
quality |
Per-record and per-batch data-quality checks: QualitySpec config, CompiledQuality, apply_quality, QualityOutcome. Gated on the quality / quality-jsonschema features. |
util |
quote_ident, extract_records, check_http_response |
License
Licensed under either of MIT or Apache-2.0 at your option.