Skip to main content

Crate faucet_stream

Crate faucet_stream 

Source
Expand description

§faucet-stream

A declarative, config-driven data pipeline with pluggable source and sink connectors.

📖 Guide, tutorials & cookbook: https://pawansikawat.github.io/faucet-stream/

§Feature flags

FeatureDescription
source-rest (default)REST API source with pagination, auth, transforms
source-graphqlGraphQL API source with cursor pagination
source-xmlXML/SOAP API source with XML-to-JSON conversion
source-grpcgRPC source with dynamic protobuf messages
source-postgresPostgreSQL query source
source-postgres-cdcPostgreSQL CDC source (logical replication)
source-mysqlMySQL query source
source-mssqlMicrosoft SQL Server query source
source-sqliteSQLite query source
source-s3AWS S3 file source
source-mongodbMongoDB query source
source-redisRedis source (streams, lists, keys)
source-webhookWebhook HTTP receiver source
source-websocketWebSocket streaming source
source-csvCSV file source
source-elasticsearchElasticsearch search/scroll source
source-kafkaApache Kafka consumer source
source-parquetApache Parquet file source (local, glob, S3)
sink-bigqueryGoogle BigQuery streaming insert sink
sink-postgresPostgreSQL sink (jsonb or auto-mapped columns)
sink-jsonlJSON Lines file sink
sink-snowflakeSnowflake SQL REST API sink
sink-mysqlMySQL sink
sink-mssqlMicrosoft SQL Server sink
sink-sqliteSQLite sink
sink-s3AWS S3 file sink
sink-mongodbMongoDB insert sink
sink-redisRedis sink (streams, lists, key-value)
sink-csvCSV file sink
sink-elasticsearchElasticsearch bulk index sink
sink-httpHTTP POST sink
sink-kafkaApache Kafka producer sink
sink-parquetApache Parquet file sink (local, S3)
kafka-schema-registrySchema Registry support for Kafka connectors
sourceAll source connectors
sinkAll sink connectors
fullEvery connector

Modules§

adaptive
Adaptive batch sizing — an AIMD controller that auto-tunes the effective write batch size per pipeline row from observed sink latency + error rate. Pure logic (no I/O); run_stream feeds it observations and emits metrics. See docs/superpowers/specs/2026-05-31-adaptive-batch-sizing-design.md.
async_stream
Asynchronous stream of elements.
authauth
Single-flight OAuth2 / token-endpoint auth providers (enable the auth feature). Share one across connectors via with_auth_provider or the CLI auth: { ref } catalog.
check
Preflight check types for faucet doctor (#126).
common_gcssink-gcs or source-gcs
common_kafkasink-kafka or source-kafka
compressioncompression
Transparent gzip / zstd compression wrappers for file-shaped connectors.
config
Configuration loading utilities.
dlq
Dead-letter queue (DLQ) wiring shared by the pipeline runner.
error
Error types for faucet-stream.
futures_core
Core traits and types for asynchronous operations in Rust.
observability
Pipeline-internal observability: tracing spans and metrics counters/ histograms wired automatically around every source, sink, transform, and state-store operation. See docs/superpowers/specs/2026-05-23-observability-otel-prometheus-design.md.
pipeline
Source-to-sink pipeline orchestration.
qualityquality
Data-quality checks: declarative per-record and per-batch assertions that quarantine violating records to the DLQ or abort the run. Pure evaluation; the pipeline wires the DLQ routing in run_stream.
replication
Incremental replication support.
retry
Shared exponential-backoff retry executor for HTTP-style sources.
schema
JSON Schema inference from record samples.
schemars
Schemars
sink
sourcesource-rest
stage
Pipeline-level transform stages. A TransformStage wraps one of four shapes:
state
traits
Shared traits for faucet sources and sinks.
transform
Record transformation pipeline.
transforming_source
Wrap any Source with a fixed list of TransformStages applied to every emitted record. The canonical way for library callers to attach stages (transforms wrapped via TransformStage::Map, plus Filter / Explode / Custom); the CLI uses this same type internally.
util
Shared utilities used across faucet source and sink crates.

Macros§

json
Construct a serde_json::Value from a JSON literal.
schema_for
Generates a Schema for the given type using default settings. The default settings currently conform to JSON Schema 2020-12, but this is liable to change in a future version of Schemars if support for other JSON Schema versions is added.

Structs§

AdaptiveBatchConfig
Configuration for the adaptive batch-size controller. Lives under execution.adaptive_batch_size. Default enabled = false (opt-in); when disabled the pipeline writes each page exactly as before.
Adjustment
A size change the controller decided on.
AimdController
Additive-increase / multiplicative-decrease controller. Pure + deterministic.
AuthReference
A { ref: <name> } pointer to a named provider in the top-level auth: catalog. The only permitted key is ref.
CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.
CheckContext
Inputs a probe may need. The doctor command enforces timeout on the whole check() call; connectors may also use it to bound their own client calls.
CheckReport
A connector’s full preflight report.
CheckTally
Per-check counters + elapsed time, keyed by check name. Emitted as metrics by the observability wrapper.
CompiledQuality
A fully compiled quality spec. Built once via CompiledQuality::compile.
DlqConfig
Pipeline-level DLQ wiring.
DlqStats
Counters returned alongside PipelineResult when a DLQ is wired.
DurationGuard
On Drop, records the elapsed time since construction into the named histogram with the supplied labels. Recording on drop guarantees a sample even if the surrounding future is cancelled or panics.
ExplodeSpectransform-explode
Spec for TransformStage::Explode. Fans one record out into N records based on the array at path. Compiled into a CompiledExplode at pipeline-build time; per-record work is one path resolve + N clones + N merges.
FileStateStore
File-backed StateStore. Each key maps to a JSON file at {root}/{safe_filename(key)}.json, written via atomic rename. The filename stem percent-encodes : as %3A so keys using the pipeline:rest:issues convention are valid on Windows.
FilterSpectransform-filter
Predicate spec for TransformStage::Filter. Compiled into a CompiledFilter at pipeline-build time; per-record work is a single path resolve + comparison.
InstallReport
Report from install_observability so callers can log what actually happened (recorder installed vs. already-installed vs. disabled).
InstrumentedSink
Wraps a &dyn Sink (or any &S: Sink) and emits spans + metrics around write_batch and flush. Constructed by Pipeline::run.
InstrumentedSource
Wraps a &dyn Source (or any &S: Source) and emits spans + metrics around every call. Constructed by Pipeline::run and never exposed to end users; the wrapped source remains the user-facing object.
InstrumentedStateStore
Wraps an Arc<dyn StateStore> and emits faucet.state.* spans + metrics around every operation.
Labels
Common labels carried by every span and metric.
MemoryStateStore
In-memory StateStore for tests and ephemeral pipelines.
ObservabilityConfig
Configuration for install_observability. Either or both sections may be None; unset sections install nothing.
Observation
One observed sub-batch write outcome fed to the controller.
Pipeline
A pipeline that moves data from a Source to a Sink.
PipelineResult
Result of a pipeline run.
Probe
One named probe within a CheckReport (e.g. "read", "auth", "network", "permissions", "schema", "io", "sentinel").
PrometheusConfig
QualityOutcome
Result of applying the quality pass to one page.
QualitySpec
The quality: config block. Per-record checks run first (partitioning the page into survivors + quarantined); per-batch checks then run over the survivors.
QuarantinedRecord
A record removed from the page by a quality check, destined for the DLQ.
ResponseValidatorsource-rest
Optional callback to decide whether the token endpoint response is successful.
RestStreamsource-rest
A configured REST API stream that handles pagination, auth, and extraction.
RestStreamConfigsource-rest
Configuration for a RestStream.
RunStreamOptions
StreamPage
One page emitted by Source::stream_pages.
TracingConfig
TransformingSource
Source decorator that applies a fixed list of compiled stages to every record. Emits faucet_transform_* metrics per page via instrumented_apply_stages.

Enums§

AdjustDirection
Direction of a batch-size adjustment (metric label).
AdjustReason
Why an adjustment happened (metric label).
Authsource-rest
Supported authentication methods.
AuthSpec
A connector’s auth: field: either an inline auth definition A (the { type, config } shape), or a { ref: <name> } reference to a shared provider defined in the top-level auth: catalog.
BatchCheck
A per-batch check, evaluated per page over the survivors of the per-record pass.
CastOnErrortransform-cast
Failure policy for RecordTransform::Cast. Default: Error.
CastTypetransform-cast
Target type for RecordTransform::Cast.
CompareOp
Ordering / equality operator for the compare check.
Compression
Internal post-resolution codec. No Auto variant — call CompressionConfig::resolve.
CompressionConfig
User-facing compression config. Defaults to Auto.
Credential
A resolved credential produced by an AuthProvider or built from inline auth config. Connectors map this onto their wire protocol (HTTP header, gRPC metadata, …).
DlqReason
Reason a page produced DLQ traffic. Used as a metric label and span attribute; closed-set enum so cardinality stays bounded.
FaucetError
All possible errors returned by faucet-stream.
FilterOptransform-filter
Comparison operator for FilterSpec.
InstallError
JsonType
Expected JSON type for the type_is check.
KeyCaseModetransform-keys-case
Output convention for RecordTransform::KeysCase.
OnBatchError
Policy applied when a sink reports an outer failure (the whole batch failed, no per-row info).
OnFailure
What to do when a check fails. The allowed subset is validated per check at compile time (see compile.rs).
OnMissingtransform-explode
Behaviour when an ExplodeSpec’s path doesn’t yield a non-empty array. The default is Passthrough because silently dropping records is the worst failure mode for ETL pipelines — surfacing the record unchanged lets downstream stages decide.
PaginationStylesource-rest
Supported pagination strategies.
ProbeStatus
Outcome of a single probe.
RecordCheck
A per-record check. Addressed field accepts the filter/explode path subset (bare key, dot.path, $['bracketed']).
RecordTransform
A transformation applied to every record fetched by a source (e.g. the REST source’s RestStream).
ReplicationMethod
Determines how records are replicated from the source.
TransformStage
One stage in a transform pipeline.
Value
Represents any valid JSON value.
ValueCaseModetransform-value-case
String-value casing mode for RecordTransform::ValueCase.

Constants§

DEFAULT_BATCH_SIZE
Default page size used when a caller does not specify one.
DEFAULT_EXPIRY_RATIOsource-rest
Default fraction of expires_in after which the token is refreshed.
DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIOsource-rest
Default fraction of expires_in after which the token is refreshed.
MAX_BATCH_SIZE
Hard upper bound on batch_size. Values above this (other than the special 0 “no batching” sentinel) are rejected at config validation time to prevent accidental O(total) buffering in the default implementation of Source::stream_pages.

Traits§

AuthProvider
A live, shareable source of credentials.
JsonSchema
A type which can be described as a JSON Schema document.
Sink
A sink writes records to an external system.
Source
A source fetches records from an external system.
StateStore
Persistent key/value store for replication bookmarks and pipeline checkpoints.
Stream
A stream of values produced asynchronously.

Functions§

apply_quality
Apply the full per-page quality pass. Pure: no metrics, no DLQ I/O.
build_envelope
Build a single DLQ envelope.
compile_stage
Compile a TransformStage into its CompiledStage form.
compress_buf
One-shot in-memory compression. Used by S3 and GCS sinks that build a full Vec<u8> body before upload.
execute_with_retry
Execute operation with up to max_retries retries on retriable errors, using exponential backoff (base_backoff * 2^attempt) with random jitter. Non-retriable errors return immediately; Ok returns immediately.
fetch_oauth2_tokensource-rest
Fetch an OAuth2 token using the client credentials grant.
fetch_token_from_endpointsource-rest
Fetch a token from the given endpoint and extract it using JSONPath.
install_observabilityNon-observability-install
Non-observability-install stub. Returns an empty report, never panics.
instrumented_apply_quality
Apply the quality pass and emit metrics. Returns the same outcome as apply_quality; on abort, increments faucet_quality_aborts_total before propagating the error.
instrumented_apply_stages
Apply a sequence of compiled stages to every record in records, flat-mapping per stage. Emits one faucet.transform.apply span and counters faucet_transform_records_in_total / _records_out_total per call (per page).
register_build_info
Register the faucet_build_info{version} gauge (set to 1) under the currently-installed metrics recorder. Safe to call from any code path that wants to ensure the gauge is set; install_observability invokes this automatically. Gauges are naturally idempotent under the metrics model — repeat calls just re-set the same value.
run_stream
Run a streaming pipeline, writing each StreamPage to the sink as it arrives and persisting bookmarks per page.
update_bookmark_lag
Update the bookmark-lag gauges if the bookmark is a parseable RFC3339 timestamp. Returns true if the gauges were updated, false otherwise.
validate_batch_size
Validate a batch_size value against the global constraints.
warn_mismatch
Log a one-shot warning when the explicit codec disagrees with the filename’s detected codec. Deduplicates per (path, declared) pair across the whole process so a million-object scan does not flood logs.

Type Aliases§

RowOutcome
Per-row outcome from Sink::write_batch_partial.
SharedAuthProvider
A shared AuthProvider handle. Cloning it shares the one live provider (and its single token cache) across connectors.

Attribute Macros§

async_trait

Derive Macros§

JsonSchema
Derive macro for JsonSchema trait.