faucet-core
Shared types, traits, and utilities for the faucet-stream ecosystem.
This is the foundation crate that all faucet source and sink connectors depend on. If you're building a custom connector, this is the only dependency you need.
Installation
[]
= "0.1"
= { = "1", = ["rt"] }
What's Inside
Traits
Source— async trait for fetching records from external systemsSink— async trait for writing records to external systems
Both traits include a config_schema() method that returns a JSON Schema describing the connector's configuration.
use ;
Pipeline
Connect any source to any sink:
use ;
// Batch mode: fetch all, then write
let result = new.run.await?;
println!;
// Streaming mode: write page-by-page (bounded memory)
let result = run_stream.await?;
Error Types
FaucetError covers all failure modes:
| Variant | Use Case |
|---|---|
Http(reqwest::Error) |
HTTP transport errors |
HttpStatus { status, url, body } |
Non-success HTTP responses |
Json(serde_json::Error) |
JSON parse/serialize errors |
JsonPath(String) |
JSONPath extraction failures |
Auth(String) |
Authentication errors |
RateLimited { retry_after } |
429 rate limit responses |
Url(String) |
URL construction errors |
Transform(String) |
Record transform errors |
Config(String) |
Configuration/validation errors |
Source(String) |
Source-specific errors |
Sink(String) |
Sink-specific errors |
Custom(Box<dyn Error>) |
Wrap any third-party error |
Config Loading
Load any Deserialize-able config struct from JSON files or environment variables:
use ;
// From a JSON file
let config: MyConfig = load_json?;
// From environment variables (reads MYAPP_URL, MYAPP_BATCH_SIZE, etc.)
let config: MyConfig = load_env?;
// From a .env file + environment variables
let config: MyConfig = load_env_file?;
Duration Serde Helpers
For Duration fields in configs, use the provided serde modules:
use Duration;
use ;
Record Transforms
Transform records as they flow through the pipeline:
use RecordTransform;
// Flatten nested objects: {"user": {"id": 1}} -> {"user__id": 1}
Flatten
// Convert keys to snake_case
KeysToSnakeCase
// Regex key renaming
RenameKeys
// Custom closure
custom
Replication
Incremental replication support:
use ReplicationMethod;
use ;
// Filter records newer than a bookmark
let filtered = filter_incremental;
// Compute new bookmark from records
let new_bookmark = max_replication_value;
Schema Inference
Infer JSON Schema from record samples:
use infer_schema;
let schema = infer_schema;
// Returns a JSON Schema with inferred types, nullable fields, nested objects
JSON Schema Generation
All config structs derive schemars::JsonSchema. Use schema_for! to generate schemas:
use ;
use ;
let schema = schema_for!;
let json = to_value?;
Re-exports
faucet-core re-exports common dependencies so connector authors only need one dependency:
| Re-export | From |
|---|---|
async_trait |
async-trait |
serde_json, Value, json! |
serde_json |
schemars, JsonSchema, schema_for! |
schemars |
Modules
| Module | Contents |
|---|---|
traits |
Source and Sink async traits |
error |
FaucetError enum |
pipeline |
Pipeline, PipelineResult, run_stream |
config |
load_json, load_env, load_env_file, duration serde helpers |
transform |
RecordTransform, CompiledTransform |
replication |
ReplicationMethod, filter_incremental, max_replication_value |
schema |
infer_schema |
util |
quote_ident, extract_records, check_http_response |
License
Licensed under either of MIT or Apache-2.0 at your option.