faucet-source-graphql
A config-driven GraphQL API source with cursor-based pagination, JSONPath record extraction, and pluggable authentication. Transient HTTP failures (5xx / connection resets) are retried with exponential backoff.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-graphql"] }
Quick Start
use ;
async
Configuration
GraphqlStreamConfig
| Field | Type | Default | Description |
|---|---|---|---|
endpoint |
String |
(required) | GraphQL endpoint URL |
query |
String |
(required) | The GraphQL query string |
variables |
Value |
{} |
Variables to pass with the query |
auth |
GraphqlAuth |
GraphqlAuth::None |
Authentication method |
headers |
HeaderMap |
empty | Additional request headers |
records_path |
Option<String> |
None |
JSONPath expression to extract records from the response. When None, the data field of the response is returned as a single record |
pagination |
Option<GraphqlPagination> |
None |
Pagination configuration. None for single-page queries |
max_pages |
Option<usize> |
None |
Maximum number of pages to fetch |
batch_size |
usize |
1000 (DEFAULT_BATCH_SIZE) |
Records per emitted StreamPage and the value injected as the GraphQL first: cursor argument. See Streaming and batching below |
Authentication (GraphqlAuth)
| Variant | Description |
|---|---|
None |
No authentication |
Bearer { token } |
Bearer token in the Authorization header |
Custom { headers } |
Custom headers map (HashMap<String, String>) attached to every request |
Pagination (GraphqlPagination)
Cursor-based pagination following the Relay specification with pageInfo { hasNextPage, endCursor }.
| Field | Type | Default | Description |
|---|---|---|---|
has_next_page_path |
String |
"$.data.*.pageInfo.hasNextPage" |
JSONPath to the hasNextPage boolean in the response |
cursor_path |
String |
"$.data.*.pageInfo.endCursor" |
JSONPath to the endCursor string in the response |
cursor_variable |
String |
"after" |
Name of the cursor variable in the GraphQL query |
page_size_variable |
String |
"first" |
Name of the page-size variable that GraphqlStreamConfig::batch_size is injected into |
Pagination includes loop detection -- if the same cursor is returned twice in a row, pagination stops.
Streaming and batching
GraphqlStream implements Source::stream_pages: each upstream GraphQL response is emitted as one StreamPage, so a pipeline writes to the sink as each page arrives instead of buffering the full result set client-side.
batch_size(default1000, max1_000_000) maps directly to the GraphQLfirst:cursor argument (or whatever variablepage_size_variablenames). Each emittedStreamPagecarries up tobatch_sizerecords — the upstream is what actually decides how many records to return per request, so a server that capsfirst:will produce smaller pages than requested.batch_size = 0is the "no batching" sentinel: the page-size variable is omitted from the request entirely so the upstream uses its own default page size, and the whole response is emitted as a single page. Use it for tiny lookup queries, or when the upstream's default page size already matches what the sink wants. If the upstream schema declares the page-size variable as non-null (first: Int!), the server will respond with a GraphQL validation error and the stream will surface aFaucetError::Configexplaining the contract — pick a non-zerobatch_sizein that case.- Bookmarks are always
Nonetoday — the GraphQL source has no incremental-replication mode yet. Themax_pagestruncation guard is wired structurally so a future incremental mode inherits the same trailing-checkpoint behaviour the REST source uses.
Config Loading
use ;
use GraphqlStreamConfig;
let config: GraphqlStreamConfig = load_json?;
let config: GraphqlStreamConfig = load_env_file?;
Example JSON config
Example .env file
GRAPHQL_ENDPOINT=https://api.github.com/graphql
GRAPHQL_QUERY=query { viewer { login } }
GRAPHQL_MAX_PAGES=10
Config Schema Introspection
use Source;
let stream = new;
let schema = stream.config_schema;
println!;
Examples
Simple single-page query
use ;
use GraphqlAuth;
use json;
let config = new
.variables
.auth;
let stream = new;
let records = stream.fetch_all.await?;
Relay cursor pagination with record extraction
use ;
use ;
use json;
let config = new
.auth
.records_path
.pagination
.with_batch_size
.max_pages;
let stream = new;
let users = stream.fetch_all.await?;
println!;
Using with a Pipeline
use ;
use ;
let source = new;
let pipeline = new;
let result = pipeline.run.await?;
println!;
License
Licensed under MIT or Apache-2.0.