Expand description
§faucet-stream
A declarative, config-driven data pipeline with pluggable source and sink connectors.
📖 Guide, tutorials & cookbook: https://pawansikawat.github.io/faucet-stream/
§Feature flags
| Feature | Description |
|---|---|
source-rest (default) | REST API source with pagination, auth, transforms |
source-graphql | GraphQL API source with cursor pagination |
source-xml | XML/SOAP API source with XML-to-JSON conversion |
source-grpc | gRPC source with dynamic protobuf messages |
source-postgres | PostgreSQL query source |
source-postgres-cdc | PostgreSQL CDC source (logical replication) |
source-mysql | MySQL query source |
source-mssql | Microsoft SQL Server query source |
source-sqlite | SQLite query source |
source-s3 | AWS S3 file source |
source-mongodb | MongoDB query source |
source-redis | Redis source (streams, lists, keys) |
source-webhook | Webhook HTTP receiver source |
source-websocket | WebSocket streaming source |
source-csv | CSV file source |
source-elasticsearch | Elasticsearch search/scroll source |
source-kafka | Apache Kafka consumer source |
source-parquet | Apache Parquet file source (local, glob, S3) |
sink-bigquery | Google BigQuery streaming insert sink |
sink-postgres | PostgreSQL sink (jsonb or auto-mapped columns) |
sink-jsonl | JSON Lines file sink |
sink-snowflake | Snowflake SQL REST API sink |
sink-mysql | MySQL sink |
sink-mssql | Microsoft SQL Server sink |
sink-sqlite | SQLite sink |
sink-s3 | AWS S3 file sink |
sink-mongodb | MongoDB insert sink |
sink-redis | Redis sink (streams, lists, key-value) |
sink-csv | CSV file sink |
sink-elasticsearch | Elasticsearch bulk index sink |
sink-http | HTTP POST sink |
sink-kafka | Apache Kafka producer sink |
sink-parquet | Apache Parquet file sink (local, S3) |
kafka-schema-registry | Schema Registry support for Kafka connectors |
source | All source connectors |
sink | All sink connectors |
full | Every connector |
Modules§
- adaptive
- Adaptive batch sizing — an AIMD controller that auto-tunes the effective
write batch size per pipeline row from observed sink latency + error rate.
Pure logic (no I/O);
run_streamfeeds it observations and emits metrics. Seedocs/superpowers/specs/2026-05-31-adaptive-batch-sizing-design.md. - async_
stream - Asynchronous stream of elements.
- auth
auth - Single-flight OAuth2 / token-endpoint auth providers (enable the
authfeature). Share one across connectors viawith_auth_provideror the CLIauth: { ref }catalog. - check
- Preflight check types for
faucet doctor(#126). - common_
gcs sink-gcsorsource-gcs - common_
kafka sink-kafkaorsource-kafka - compression
compression - Transparent gzip / zstd compression wrappers for file-shaped connectors.
- config
- Configuration loading utilities.
- dlq
- Dead-letter queue (DLQ) wiring shared by the pipeline runner.
- error
- Error types for faucet-stream.
- futures_
core - Core traits and types for asynchronous operations in Rust.
- observability
- Pipeline-internal observability: tracing spans and
metricscounters/ histograms wired automatically around every source, sink, transform, and state-store operation. Seedocs/superpowers/specs/2026-05-23-observability-otel-prometheus-design.md. - pipeline
- Source-to-sink pipeline orchestration.
- quality
quality - Data-quality checks: declarative per-record and per-batch assertions that
quarantine violating records to the DLQ or abort the run. Pure evaluation;
the pipeline wires the DLQ routing in
run_stream. - replication
- Incremental replication support.
- retry
- Shared exponential-backoff retry executor for HTTP-style sources.
- schema
- JSON Schema inference from record samples.
- schemars
- Schemars
- sink
- source
source-rest - stage
- Pipeline-level transform stages. A
TransformStagewraps one of four shapes: - state
- traits
- Shared traits for faucet sources and sinks.
- transform
- Record transformation pipeline.
- transforming_
source - Wrap any
Sourcewith a fixed list ofTransformStages applied to every emitted record. The canonical way for library callers to attach stages (transforms wrapped viaTransformStage::Map, plusFilter/Explode/Custom); the CLI uses this same type internally. - util
- Shared utilities used across faucet source and sink crates.
Macros§
- json
- Construct a
serde_json::Valuefrom a JSON literal. - schema_
for - Generates a
Schemafor the given type using default settings. The default settings currently conform to JSON Schema 2020-12, but this is liable to change in a future version of Schemars if support for other JSON Schema versions is added.
Structs§
- Adaptive
Batch Config - Configuration for the adaptive batch-size controller. Lives under
execution.adaptive_batch_size. Defaultenabled = false(opt-in); when disabled the pipeline writes each page exactly as before. - Adjustment
- A size change the controller decided on.
- Aimd
Controller - Additive-increase / multiplicative-decrease controller. Pure + deterministic.
- Auth
Reference - A
{ ref: <name> }pointer to a named provider in the top-levelauth:catalog. The only permitted key isref. - Cancellation
Token - A token which can be used to signal a cancellation request to one or more tasks.
- Check
Context - Inputs a probe may need. The doctor command enforces
timeouton the wholecheck()call; connectors may also use it to bound their own client calls. - Check
Report - A connector’s full preflight report.
- Check
Tally - Per-check counters + elapsed time, keyed by check name. Emitted as metrics by the observability wrapper.
- Compiled
Quality - A fully compiled quality spec. Built once via
CompiledQuality::compile. - DlqConfig
- Pipeline-level DLQ wiring.
- DlqStats
- Counters returned alongside
PipelineResultwhen a DLQ is wired. - Duration
Guard - On
Drop, records the elapsed time since construction into the named histogram with the supplied labels. Recording on drop guarantees a sample even if the surrounding future is cancelled or panics. - Explode
Spec transform-explode - Spec for
TransformStage::Explode. Fans one record out into N records based on the array atpath. Compiled into aCompiledExplodeat pipeline-build time; per-record work is one path resolve + N clones + N merges. - File
State Store - File-backed
StateStore. Each key maps to a JSON file at{root}/{safe_filename(key)}.json, written via atomic rename. The filename stem percent-encodes:as%3Aso keys using thepipeline:rest:issuesconvention are valid on Windows. - Filter
Spec transform-filter - Predicate spec for
TransformStage::Filter. Compiled into aCompiledFilterat pipeline-build time; per-record work is a single path resolve + comparison. - Install
Report - Report from
install_observabilityso callers can log what actually happened (recorder installed vs. already-installed vs. disabled). - Instrumented
Sink - Wraps a
&dyn Sink(or any&S: Sink) and emits spans + metrics aroundwrite_batchandflush. Constructed byPipeline::run. - Instrumented
Source - Wraps a
&dyn Source(or any&S: Source) and emits spans + metrics around every call. Constructed byPipeline::runand never exposed to end users; the wrapped source remains the user-facing object. - Instrumented
State Store - Wraps an
Arc<dyn StateStore>and emits faucet.state.* spans + metrics around every operation. - Labels
- Common labels carried by every span and metric.
- Memory
State Store - In-memory
StateStorefor tests and ephemeral pipelines. - Observability
Config - Configuration for
install_observability. Either or both sections may beNone; unset sections install nothing. - Observation
- One observed sub-batch write outcome fed to the controller.
- Pipeline
- A pipeline that moves data from a
Sourceto aSink. - Pipeline
Result - Result of a pipeline run.
- Probe
- One named probe within a
CheckReport(e.g."read","auth","network","permissions","schema","io","sentinel"). - Prometheus
Config - Quality
Outcome - Result of applying the quality pass to one page.
- Quality
Spec - The
quality:config block. Per-record checks run first (partitioning the page into survivors + quarantined); per-batch checks then run over the survivors. - Quarantined
Record - A record removed from the page by a quality check, destined for the DLQ.
- Response
Validator source-rest - Optional callback to decide whether the token endpoint response is successful.
- Rest
Stream source-rest - A configured REST API stream that handles pagination, auth, and extraction.
- Rest
Stream Config source-rest - Configuration for a RestStream.
- RunStream
Options - Stream
Page - One page emitted by
Source::stream_pages. - Tracing
Config - Transforming
Source - Source decorator that applies a fixed list of compiled stages to every
record. Emits
faucet_transform_*metrics per page viainstrumented_apply_stages.
Enums§
- Adjust
Direction - Direction of a batch-size adjustment (metric label).
- Adjust
Reason - Why an adjustment happened (metric label).
- Auth
source-rest - Supported authentication methods.
- Auth
Spec - A connector’s
auth:field: either an inline auth definitionA(the{ type, config }shape), or a{ ref: <name> }reference to a shared provider defined in the top-levelauth:catalog. - Batch
Check - A per-batch check, evaluated per page over the survivors of the per-record pass.
- Cast
OnError transform-cast - Failure policy for
RecordTransform::Cast. Default:Error. - Cast
Type transform-cast - Target type for
RecordTransform::Cast. - Compare
Op - Ordering / equality operator for the
comparecheck. - Compression
- Internal post-resolution codec. No
Autovariant — callCompressionConfig::resolve. - Compression
Config - User-facing compression config. Defaults to
Auto. - Credential
- A resolved credential produced by an
AuthProvideror built from inline auth config. Connectors map this onto their wire protocol (HTTP header, gRPC metadata, …). - DlqReason
- Reason a page produced DLQ traffic. Used as a metric label and span attribute; closed-set enum so cardinality stays bounded.
- Faucet
Error - All possible errors returned by faucet-stream.
- Filter
Op transform-filter - Comparison operator for
FilterSpec. - Install
Error - Json
Type - Expected JSON type for the
type_ischeck. - KeyCase
Mode transform-keys-case - Output convention for
RecordTransform::KeysCase. - OnBatch
Error - Policy applied when a sink reports an outer failure (the whole batch failed, no per-row info).
- OnFailure
- What to do when a check fails. The allowed subset is validated per check
at compile time (see
compile.rs). - OnMissing
transform-explode - Behaviour when an
ExplodeSpec’spathdoesn’t yield a non-empty array. The default isPassthroughbecause silently dropping records is the worst failure mode for ETL pipelines — surfacing the record unchanged lets downstream stages decide. - Pagination
Style source-rest - Supported pagination strategies.
- Probe
Status - Outcome of a single probe.
- Record
Check - A per-record check. Addressed field accepts the filter/explode path subset
(bare key,
dot.path,$['bracketed']). - Record
Transform - A transformation applied to every record fetched by a source (e.g. the REST
source’s
RestStream). - Replication
Method - Determines how records are replicated from the source.
- Transform
Stage - One stage in a transform pipeline.
- Value
- Represents any valid JSON value.
- Value
Case Mode transform-value-case - String-value casing mode for
RecordTransform::ValueCase.
Constants§
- DEFAULT_
BATCH_ SIZE - Default page size used when a caller does not specify one.
- DEFAULT_
EXPIRY_ RATIO source-rest - Default fraction of
expires_inafter which the token is refreshed. - DEFAULT_
TOKEN_ ENDPOINT_ EXPIRY_ RATIO source-rest - Default fraction of
expires_inafter which the token is refreshed. - MAX_
BATCH_ SIZE - Hard upper bound on
batch_size. Values above this (other than the special0“no batching” sentinel) are rejected at config validation time to prevent accidental O(total) buffering in the default implementation ofSource::stream_pages.
Traits§
- Auth
Provider - A live, shareable source of credentials.
- Json
Schema - A type which can be described as a JSON Schema document.
- Sink
- A sink writes records to an external system.
- Source
- A source fetches records from an external system.
- State
Store - Persistent key/value store for replication bookmarks and pipeline checkpoints.
- Stream
- A stream of values produced asynchronously.
Functions§
- apply_
quality - Apply the full per-page quality pass. Pure: no metrics, no DLQ I/O.
- build_
envelope - Build a single DLQ envelope.
- compile_
stage - Compile a
TransformStageinto itsCompiledStageform. - compress_
buf - One-shot in-memory compression. Used by S3 and GCS sinks that build a full
Vec<u8>body before upload. - execute_
with_ retry - Execute
operationwith up tomax_retriesretries on retriable errors, using exponential backoff (base_backoff * 2^attempt) with random jitter. Non-retriable errors return immediately;Okreturns immediately. - fetch_
oauth2_ token source-rest - Fetch an OAuth2 token using the client credentials grant.
- fetch_
token_ from_ endpoint source-rest - Fetch a token from the given endpoint and extract it using JSONPath.
- install_
observability Non- observability-install - Non-
observability-installstub. Returns an empty report, never panics. - instrumented_
apply_ quality - Apply the quality pass and emit metrics. Returns the same outcome as
apply_quality; on abort, incrementsfaucet_quality_aborts_totalbefore propagating the error. - instrumented_
apply_ stages - Apply a sequence of compiled stages to every record in
records, flat-mapping per stage. Emits onefaucet.transform.applyspan and countersfaucet_transform_records_in_total/_records_out_totalper call (per page). - register_
build_ info - Register the
faucet_build_info{version}gauge (set to 1) under the currently-installedmetricsrecorder. Safe to call from any code path that wants to ensure the gauge is set;install_observabilityinvokes this automatically. Gauges are naturally idempotent under themetricsmodel — repeat calls just re-set the same value. - run_
stream - Run a streaming pipeline, writing each
StreamPageto the sink as it arrives and persisting bookmarks per page. - update_
bookmark_ lag - Update the bookmark-lag gauges if the bookmark is a parseable RFC3339
timestamp. Returns
trueif the gauges were updated,falseotherwise. - validate_
batch_ size - Validate a
batch_sizevalue against the global constraints. - warn_
mismatch - Log a one-shot warning when the explicit codec disagrees with the
filename’s detected codec. Deduplicates per
(path, declared)pair across the whole process so a million-object scan does not flood logs.
Type Aliases§
- RowOutcome
- Per-row outcome from
Sink::write_batch_partial. - Shared
Auth Provider - A shared
AuthProviderhandle. Cloning it shares the one live provider (and its single token cache) across connectors.
Attribute Macros§
Derive Macros§
- Json
Schema - Derive macro for
JsonSchematrait.