faucet-stream

A declarative, config-driven REST API client for Rust with pluggable
authentication, pagination, and JSONPath extraction.
Inspired by Meltano's RESTStream
— but for Rust, and as a reusable library.
Features
- Authentication — Bearer, Basic, API Key, OAuth2 (client credentials), or custom headers
- Pagination — cursor/token (JSONPath), page number, offset/limit, Link header
- JSONPath extraction — point at where records live in any JSON response
- Retries with backoff — exponential backoff with configurable limits
- Typed deserialization — get
Vec<Value> or deserialize directly into your structs
- Async-first — built on
reqwest + tokio
Quick Start
Add to your Cargo.toml:
[dependencies]
faucet-stream = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
Cursor-based pagination with Bearer auth
use faucet_stream::{RestStream, RestStreamConfig, Auth, PaginationStyle};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = RestStream::new(
RestStreamConfig::new("https://api.example.com", "/v1/users")
.auth(Auth::Bearer("my-token".into()))
.records_path("$.data[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.meta.next_cursor".into(),
param_name: "cursor".into(),
})
.max_pages(50),
)?;
let users: Vec<serde_json::Value> = stream.fetch_all().await?;
println!("Fetched {} users", users.len());
Ok(())
}
Page-number pagination with API key
use faucet_stream::{RestStream, RestStreamConfig, Auth, PaginationStyle};
let stream = RestStream::new(
RestStreamConfig::new("https://api.example.com", "/v2/orders")
.auth(Auth::ApiKey {
header: "X-Api-Key".into(),
value: "secret".into(),
})
.records_path("$.results[*]")
.pagination(PaginationStyle::PageNumber {
param_name: "page".into(),
start_page: 1,
page_size: Some(100),
page_size_param: Some("per_page".into()),
}),
)?;
Offset pagination with Basic auth
use faucet_stream::{RestStream, RestStreamConfig, Auth, PaginationStyle};
use std::time::Duration;
let stream = RestStream::new(
RestStreamConfig::new("https://api.example.com", "/records")
.auth(Auth::Basic {
username: "user".into(),
password: "pass".into(),
})
.records_path("$.items[*]")
.pagination(PaginationStyle::Offset {
offset_param: "offset".into(),
limit_param: "limit".into(),
limit: 50,
total_path: Some("$.total_count".into()),
})
.request_delay(Duration::from_millis(200)),
)?;
OAuth2 client credentials
use faucet_stream::{Auth, fetch_oauth2_token};
let token = fetch_oauth2_token(
"https://auth.example.com/oauth/token",
"client-id",
"client-secret",
&["read:data".into()],
).await?;
let config = RestStreamConfig::new("https://api.example.com", "/data")
.auth(Auth::Bearer(token));
Streaming page-by-page
Process records as each page arrives without waiting for all pages to complete:
use faucet_stream::{RestStream, RestStreamConfig, PaginationStyle};
use futures::StreamExt;
let stream = RestStream::new(
RestStreamConfig::new("https://api.example.com", "/v1/events")
.records_path("$.events[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
}),
)?;
let mut pages = stream.stream_pages();
while let Some(result) = pages.next().await {
let records = result?;
println!("processing page of {} records", records.len());
}
Typed deserialization
use serde::Deserialize;
use faucet_stream::{RestStream, RestStreamConfig};
#[derive(Debug, Deserialize)]
struct User {
id: u64,
name: String,
email: String,
}
let stream = RestStream::new(
RestStreamConfig::new("https://api.example.com", "/users")
.records_path("$.data[*]"),
)?;
let users: Vec<User> = stream.fetch_all_as::<User>().await?;
Pagination Styles
| Style |
Use When |
Cursor |
API returns a next-page token in the response body |
PageNumber |
API uses ?page=1&per_page=100 style |
Offset |
API uses ?offset=0&limit=50 style |
LinkHeader |
API returns pagination in Link HTTP header (GitHub-style) |
License
Licensed under either of
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.