faucet-stream
A declarative, config-driven REST API client for Rust with pluggable authentication, pagination, record transforms, schema inference, and incremental replication.
Inspired by Meltano's RESTStream — but for Rust, and as a reusable library.
Features
- Authentication — Bearer, Basic, API Key (header or query param), OAuth2 (client credentials), or custom headers
- Pagination — cursor/token (JSONPath), page number, offset/limit, Link header, next-link-in-body
- JSONPath extraction — point at where records live in any JSON response
- Record transforms — flatten nested objects, rename keys (regex), snake_case normalisation, or custom closures
- Schema inference — automatically derive a JSON Schema from sampled records
- Incremental replication — bookmark-based filtering so you only fetch new records
- Partitions — run the same stream across multiple contexts (e.g. per-org, per-repo)
- Retries with backoff — exponential backoff with configurable limits and 429 rate-limit handling
- Typed deserialization — get
Vec<Value>or deserialize directly into your structs - Async-first — built on
reqwest+tokio
Quick Start
Add to your Cargo.toml:
[]
= "0.1"
= { = "1", = ["full"] }
= { = "1", = ["derive"] }
Cursor-based pagination with Bearer auth
use ;
async
Page-number pagination with API key
use ;
let stream = new?;
Offset pagination with Basic auth
use ;
use Duration;
let stream = new?;
OAuth2 client credentials
use ;
let token = fetch_oauth2_token.await?;
let config = new
.auth;
Streaming page-by-page
Process records as each page arrives without waiting for all pages to complete:
use ;
use StreamExt;
let stream = new?;
let mut pages = stream.stream_pages;
while let Some = pages.next.await
Typed deserialization
use Deserialize;
use ;
let stream = new?;
let users: = stream..await?;
Record transforms
Transform every record as it's extracted. Built-in transforms are feature-gated (all enabled by default):
use ;
let stream = new?;
Disable transforms you don't need:
[]
= { = "0.1", = false, = ["transform-flatten"] }
Schema inference
Automatically derive a JSON Schema from sampled records:
use ;
let stream = new?;
let schema = stream.infer_schema.await?;
// Returns a JSON Schema object with inferred types, nullable fields, etc.
Incremental replication
Only fetch records newer than a stored bookmark:
use ;
use json;
let stream = new?;
// fetch_all_incremental returns records + the new bookmark to persist
let = stream.fetch_all_incremental.await?;
// Save `bookmark` for the next run
Partitions
Run the same stream config across multiple contexts:
use ;
use json;
use HashMap;
let stream = new?;
// Fetches repos for both orgs and concatenates the results
let repos = stream.fetch_all.await?;
Authentication Methods
| Method | Description |
|---|---|
Bearer |
Authorization: Bearer <token> header |
Basic |
Authorization: Basic <base64> header |
ApiKey |
Custom header (e.g. X-Api-Key: secret) |
ApiKeyQuery |
API key as a query parameter (e.g. ?api_key=secret) |
OAuth2 |
Client credentials flow with automatic token caching and refresh |
Custom |
Arbitrary headers |
Pagination Styles
| Style | Use When |
|---|---|
Cursor |
API returns a next-page token in the response body |
PageNumber |
API uses ?page=1&per_page=100 style |
Offset |
API uses ?offset=0&limit=50 style |
LinkHeader |
API returns pagination in Link HTTP header (GitHub-style) |
NextLinkInBody |
API returns the full next-page URL in the response body |
All pagination styles include loop detection — if the same cursor or link is returned twice in a row, pagination stops automatically.
Feature Flags
| Feature | Default | Description |
|---|---|---|
transform-flatten |
yes | RecordTransform::Flatten — flatten nested objects |
transform-rename-keys |
yes | RecordTransform::RenameKeys — regex key renaming (pulls in regex) |
transform-snake-case |
yes | RecordTransform::KeysToSnakeCase — Meltano-compatible snake_case (pulls in regex) |
transforms |
no | Convenience: enables all three transform features |
RecordTransform::Custom is always available regardless of feature flags.
Project Structure
src/
lib.rs — library entry point and re-exports
config.rs — RestStreamConfig with fluent builder API
stream.rs — RestStream: main executor (fetch_all, stream_pages, infer_schema)
error.rs — FaucetError enum
auth/
mod.rs — Auth enum
bearer.rs — Bearer token auth
basic.rs — HTTP Basic auth
api_key.rs — API key header auth
custom.rs — Custom header auth
oauth2.rs — OAuth2 client credentials with token caching
pagination/
mod.rs — PaginationStyle enum and PaginationState
cursor.rs — Cursor/token-based pagination
page.rs — Page number pagination
offset.rs — Offset/limit pagination
link_header.rs — Link header pagination
next_link_body.rs — Next-link-in-body pagination
extract/ — JSONPath record extraction
retry/ — Exponential backoff retry executor
replication.rs — Incremental replication (filtering + bookmarking)
schema.rs — JSON Schema inference from record samples
transform.rs — Record transform pipeline (flatten, rename, snake_case, custom)
tests/
auth_test.rs — Auth integration tests
pagination_test.rs — Pagination integration tests
stream_test.rs — Stream integration tests
License
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.