faucet-source-rest 1.0.0

REST API source connector for the faucet-stream ecosystem
Documentation

faucet-source-rest

Crates.io Docs.rs

A declarative, config-driven REST API client with pluggable authentication, pagination, schema inference, and incremental replication. Attach transforms by wrapping the source with faucet_core::TransformingSource.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-rest = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-rest"] }

Quick Start

use faucet_source_rest::{RestStream, RestStreamConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = RestStreamConfig::new("https://api.example.com", "/users")
        .max_pages(5);

    let stream = RestStream::new(config)?;
    let records = stream.fetch_all().await?;

    for record in &records {
        println!("{}", record);
    }
    Ok(())
}

Configuration

Core Request Fields

Field Type Default Description
base_url String "" Base URL of the API (trailing slash is trimmed)
path String "" URL path relative to base_url. Supports {key} placeholders for partition substitution (e.g. "/orgs/{org_id}/users")
method Method GET HTTP method for the request
auth Auth Auth::None Authentication method (see Authentication section)
headers HeaderMap empty Additional HTTP headers to include in every request
query_params HashMap<String, String> empty Query parameters to include in every request
body Option<Value> None JSON request body (sent with Content-Type: application/json)

Pagination Fields

Field Type Default Description
pagination PaginationStyle PaginationStyle::None Pagination strategy (see Pagination section)
records_path Option<String> None JSONPath expression to extract records from the response body
max_pages Option<usize> Some(100) Maximum number of pages to fetch. Acts as a hard cap across all pagination styles
request_delay Option<Duration> None Delay between consecutive page requests

Reliability Fields

Field Type Default Description
timeout Option<Duration> Some(30s) HTTP request timeout per individual request
max_retries u32 3 Maximum number of retries on transient failures
retry_backoff Duration 1s Base duration for exponential backoff between retries. The per-attempt sleep is retry_backoff × 2^attempt, capped at 60s and scaled by random jitter in [0.5, 1.5) (decorrelated across concurrent retries) to avoid a thundering herd. On 429, the server's Retry-After (delta-seconds or an RFC 7231 HTTP-date) is honored instead.
tolerated_http_errors Vec<u16> [] HTTP status codes treated as an empty page on the first request only (i.e. a legitimately absent/empty resource). Mid-pagination a tolerated status is surfaced as an error instead of silently ending the stream — otherwise a transient failure on page N would drop every later page as a "successful" run. Only safe for genuinely-empty resources.

A 204 No Content response — or any 2xx with an empty / whitespace-only body — is treated as an empty page ("no data"), not a parse error. A non-empty body that isn't valid JSON still fails loudly with FaucetError::Json.

Replication Fields

Field Type Default Description
replication_method ReplicationMethod FullTable FullTable fetches all records; Incremental filters by bookmark
replication_key Option<String> None Field name used for incremental replication bookmarking
start_replication_value Option<Value> None Bookmark value; records at or before this value are filtered out in incremental mode

Singer / Meltano Metadata Fields

Field Type Default Description
name Option<String> None Human-readable stream name (used in logging and Singer SCHEMA messages)
primary_keys Vec<String> [] Field names that uniquely identify a record (Singer key_properties)
schema Option<Value> None JSON Schema describing the structure of each record
schema_sample_size usize 100 Maximum number of records to sample when inferring schema. 0 means sample all available records

Partition Fields

Field Type Default Description
partitions Vec<HashMap<String, Value>> [] Each entry is a context map substituted into path placeholders. The stream executes once per partition and concatenates results. Empty means run once with no substitution
partition_concurrency Option<usize> None Maximum number of partitions to fetch concurrently. None means sequential processing

Authentication

The Auth enum supports the following strategies:

Variant Fields Description
None -- No authentication
Bearer { token } String Bearer token in the Authorization header
Basic { username, password } String, String HTTP Basic authentication
ApiKey { header, value } String, String API key sent in a custom request header
ApiKeyQuery { param, value } String, String API key sent as a query parameter (e.g. ?api_key=secret)
OAuth2 { token_url, client_id, client_secret, scopes, expiry_ratio } see below Client credentials OAuth2 flow with token caching
TokenEndpoint { url, method, headers, body, token_path, expiry_path, expiry_ratio, response_validator } see below Fetch token from an arbitrary HTTP endpoint
Custom { headers } HashMap<String, String> Arbitrary custom headers attached to every request

OAuth2 fields:

  • token_url (String): Token endpoint URL
  • client_id (String): OAuth2 client ID
  • client_secret (String): OAuth2 client secret
  • scopes (Vec<String>): Requested scopes
  • expiry_ratio (f64): Fraction of expires_in after which the token is refreshed. Must be in (0.0, 1.0]. Defaults to 0.9

TokenEndpoint fields:

  • url (String): Token endpoint URL
  • method (Method): HTTP method (e.g. POST, GET)
  • headers (HeaderMap): Headers for the token request
  • body (Option<Value>): Optional JSON body for the token request
  • token_path (String): JSONPath expression to extract the token from the response
  • expiry_path (Option<String>): JSONPath to extract expiry (seconds) from the response
  • expiry_ratio (f64): Proactive refresh ratio. Defaults to 0.9
  • response_validator (Option<ResponseValidator>): Custom success check for the token response

Pagination

The PaginationStyle enum supports:

Style Fields Stops When
None -- After first page
Cursor { next_token_path, param_name } JSONPath to next token, query param name Next token is null or absent, or loop detected
LinkHeader -- No rel="next" in the Link response header
NextLinkInBody { next_link_path } JSONPath to next page URL URL is absent, null, or empty
PageNumber { param_name, start_page, page_size, page_size_param } Query param name, starting page number, optional page size and its param name Response returns zero records, or the same page body is returned twice in a row
Offset { offset_param, limit_param, limit, total_path } Offset param, limit param, records per page, optional JSONPath to total count A zero-record page, offset reaches total (via JSONPath), or fewer records than limit

Cursor, LinkHeader, and NextLinkInBody include loop detection -- if the same cursor/link is returned twice in a row, pagination stops. Offset stops on a zero-record page (so a stalled offset cannot loop forever). PageNumber stops when a page returns zero records, and additionally detects content stagnation -- if an API clamps an out-of-range page to the last page and re-returns the identical body, pagination stops rather than looping to max_pages. For all styles, max_pages is a hard cap.

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_rest::RestStreamConfig;

// From JSON file
let config: RestStreamConfig = load_json("config.json")?;

// From .env file + environment variables
let config: RestStreamConfig = load_env_file(".env", "REST")?;

Example JSON config

{
  "base_url": "https://api.github.com",
  "path": "/repos/PawanSikawat/faucet-stream/issues",
  "method": "GET",
  "auth": {
    "type": "bearer",
    "config": {
      "token": "ghp_xxxxxxxxxxxx"
    }
  },
  "query_params": {
    "state": "open",
    "per_page": "100"
  },
  "pagination": {
    "type": "LinkHeader"
  },
  "records_path": "$[*]",
  "max_pages": 10,
  "timeout": 30,
  "max_retries": 3,
  "retry_backoff": 1,
  "tolerated_http_errors": [],
  "replication_method": "FullTable",
  "primary_keys": ["id"],
  "schema_sample_size": 100
}

Example .env file

REST_BASE_URL=https://api.github.com
REST_PATH=/repos/PawanSikawat/faucet-stream/issues
REST_METHOD=GET
REST_MAX_PAGES=10
REST_TIMEOUT=30
REST_MAX_RETRIES=3
REST_RETRY_BACKOFF=1
REST_SCHEMA_SAMPLE_SIZE=100

Config Schema Introspection

use faucet_core::Source;

let stream = RestStream::new(config)?;
let schema = stream.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Methods

Method Return Type Description
RestStream::new(config) Result<Self, FaucetError> Create a new stream. Validates auth at construction time
fetch_all() Result<Vec<Value>, FaucetError> Fetch all records across all pages and partitions
fetch_all_as::<T>() Result<Vec<T>, FaucetError> Fetch all records and deserialize into typed structs
fetch_all_incremental() Result<(Vec<Value>, Option<Value>), FaucetError> Fetch records with incremental replication, returning records and bookmark
infer_schema() Result<Value, FaucetError> Infer a JSON Schema from sampled records (or return the configured schema)
stream_pages() Pin<Box<dyn Stream<Item = Result<Vec<Value>, FaucetError>>>> Stream records page-by-page without waiting for all pages

RestStream also implements the faucet_core::Source::stream_pages trait method, which is what Pipeline::run drives internally for memory-bounded streaming. The inherent stream_pages() method (which yields Vec<Value> pages) remains for direct callers who do not need per-page bookmarks.

Examples

Cursor-paginated API with bearer auth

use faucet_source_rest::{RestStream, RestStreamConfig, Auth, PaginationStyle};

let config = RestStreamConfig::new("https://api.example.com", "/v2/contacts")
    .auth(Auth::Bearer {
        token: "your-api-token".into(),
    })
    .pagination(PaginationStyle::Cursor {
        next_token_path: "$.meta.next_cursor".into(),
        param_name: "cursor".into(),
    })
    .records_path("$.data[*]")
    .max_pages(50);

let stream = RestStream::new(config)?;
let contacts = stream.fetch_all().await?;

OAuth2 client credentials with incremental replication

use faucet_source_rest::*;
use serde_json::json;

let config = RestStreamConfig::new("https://api.example.com", "/v1/events")
    .auth(Auth::OAuth2 {
        token_url: "https://auth.example.com/oauth/token".into(),
        client_id: "my-client-id".into(),
        client_secret: "my-client-secret".into(),
        scopes: vec!["read:events".into()],
        expiry_ratio: 0.9,
    })
    .pagination(PaginationStyle::Offset {
        offset_param: "offset".into(),
        limit_param: "limit".into(),
        limit: 100,
        total_path: Some("$.total".into()),
    })
    .records_path("$.events[*]")
    .replication_method(ReplicationMethod::Incremental)
    .replication_key("updated_at")
    .start_replication_value(json!("2025-01-01T00:00:00Z"));

let stream = RestStream::new(config)?;
let (records, bookmark) = stream.fetch_all_incremental().await?;
println!("Fetched {} records, new bookmark: {:?}", records.len(), bookmark);

Multi-partition concurrent fetch

use faucet_source_rest::{RestStream, RestStreamConfig, Auth};
use std::collections::HashMap;
use serde_json::json;

let config = RestStreamConfig::new("https://api.example.com", "/orgs/{org_id}/members")
    .auth(Auth::Bearer {
        token: "token".into(),
    })
    .add_partition(HashMap::from([("org_id".into(), json!("acme"))]))
    .add_partition(HashMap::from([("org_id".into(), json!("globex"))]))
    .add_partition(HashMap::from([("org_id".into(), json!("initech"))]))
    .partition_concurrency(Some(3))
    .records_path("$.members[*]");

let stream = RestStream::new(config)?;
let all_members = stream.fetch_all().await?;

Feature Flags

Feature Default Description
transform-flatten yes Enable the Flatten record transform
transform-rename-keys yes Enable the RenameKeys regex-based transform
transform-keys-case yes Enable the KeysCase transform (snake / camel / pascal / kebab / screaming_snake)
transform-select no Enable the Select transform (keep listed top-level fields)
transform-drop no Enable the Drop transform (remove listed top-level fields)
transform-set no Enable the Set transform (insert/overwrite constants)
transform-rename-field no Enable the RenameField transform (exact-name rename)
transform-cast no Enable the Cast transform (per-field type coercion with on_error policy)
transform-redact no Enable the Redact transform (mask listed field values)
transform-value-case no Enable the ValueCase transform (lower / upper / trim string values)
transform-spell-symbols no Enable the SpellSymbols transform (spell out %, #, $, … in keys)
transforms no Enable every transform feature

License

Licensed under MIT or Apache-2.0.