use faucet_core::{DEFAULT_BATCH_SIZE, Source};
use faucet_source_graphql::config::GraphqlPagination;
use faucet_source_graphql::{GraphqlStream, GraphqlStreamConfig};
use futures::StreamExt;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
fn request_variables(req: &Request) -> Value {
let body: Value = serde_json::from_slice(&req.body).expect("request body is JSON");
body.get("variables").cloned().unwrap_or(Value::Null)
}
fn request_cursor(req: &Request) -> Option<String> {
request_variables(req)
.get("after")
.and_then(|v| v.as_str().map(|s| s.to_string()))
}
fn request_first(req: &Request) -> Option<u64> {
request_variables(req).get("first").and_then(|v| v.as_u64())
}
fn make_page(start: u64, n: u64, next_cursor: Option<&str>) -> Value {
let nodes: Vec<Value> = (start..start + n).map(|i| json!({ "id": i })).collect();
json!({
"data": {
"users": {
"edges": nodes.into_iter().map(|node| json!({ "node": node })).collect::<Vec<_>>(),
"pageInfo": {
"hasNextPage": next_cursor.is_some(),
"endCursor": next_cursor,
}
}
}
})
}
async fn mount_multi_page(server: &MockServer, total: u64, page_size: u64) {
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
let cursor = request_cursor(req);
let page_index: u64 = match cursor.as_deref() {
None => 0,
Some(s) => s
.strip_prefix("cursor-")
.and_then(|n| n.parse::<u64>().ok())
.expect("cursor must be `cursor-<N>`"),
};
let start = page_index * page_size;
if start >= total {
return ResponseTemplate::new(200).set_body_json(make_page(start, 0, None));
}
let remaining = total - start;
let this_page = remaining.min(page_size);
let next = start + this_page;
let next_cursor = if next < total {
Some(format!("cursor-{}", page_index + 1))
} else {
None
};
ResponseTemplate::new(200).set_body_json(make_page(
start,
this_page,
next_cursor.as_deref(),
))
})
.mount(server)
.await;
}
fn relay_config(server: &MockServer, batch_size: usize) -> GraphqlStreamConfig {
GraphqlStreamConfig::new(
server.uri(),
"query($first: Int, $after: String) { users(first: $first, after: $after) { \
edges { node { id } } pageInfo { hasNextPage endCursor } } }",
)
.records_path("$.data.users.edges[*].node")
.pagination(GraphqlPagination {
has_next_page_path: "$.data.users.pageInfo.hasNextPage".into(),
cursor_path: "$.data.users.pageInfo.endCursor".into(),
cursor_variable: "after".into(),
page_size_variable: "first".into(),
})
.with_batch_size(batch_size)
}
#[tokio::test(flavor = "multi_thread")]
async fn retries_transient_5xx_then_succeeds() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(503))
.up_to_n_times(2)
.expect(2)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_page(0, 3, None)))
.mount(&server)
.await;
let config =
GraphqlStreamConfig::new(server.uri(), "query { users { edges { node { id } } } }")
.records_path("$.data.users.edges[*].node");
let source = GraphqlStream::new(config);
let records = source
.fetch_all()
.await
.expect("should succeed after retries");
assert_eq!(records.len(), 3);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_emits_one_page_per_upstream_response() {
let server = MockServer::start().await;
mount_multi_page(&server, 10_000, 1_000).await;
let source = GraphqlStream::new(relay_config(&server, 1_000));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1_000);
let mut page_count = 0;
let mut total_records = 0;
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
page_count += 1;
total_records += page.records.len();
assert_eq!(
page.records.len(),
1_000,
"every page must be exactly batch_size records when total is a multiple"
);
assert!(
page.bookmark.is_none(),
"GraphQL source has no incremental mode yet; bookmark must be None"
);
}
assert_eq!(page_count, 10, "10_000 / 1_000 = 10 pages");
assert_eq!(total_records, 10_000);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_partial_final_page_holds_remainder() {
let server = MockServer::start().await;
mount_multi_page(&server, 2_500, 1_000).await;
let source = GraphqlStream::new(relay_config(&server, 1_000));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1_000);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(
sizes,
vec![1_000, 1_000, 500],
"partial trailing page must hold the remainder"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_batch_size_zero_omits_first_argument() {
let server = MockServer::start().await;
let upstream_default: u64 = 7; let total: u64 = 7;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
assert!(
request_first(req).is_none(),
"batch_size = 0 must omit the page-size variable so the upstream uses its default"
);
ResponseTemplate::new(200).set_body_json(make_page(0, upstream_default, None))
})
.mount(&server)
.await;
let source = GraphqlStream::new(relay_config(&server, 0));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 0);
let mut collected: Vec<usize> = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
collected.push(page.records.len());
}
assert_eq!(
collected,
vec![total as usize],
"batch_size = 0 must emit exactly one page sized by the upstream default"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_batch_size_zero_errors_when_upstream_requires_first() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"errors": [
{
"message": "Variable \"$first\" of required type \"Int!\" \
was not provided.",
"locations": [{"line": 1, "column": 7}],
"extensions": {"code": "GRAPHQL_VALIDATION_FAILED"}
}
]
})))
.mount(&server)
.await;
let source = GraphqlStream::new(relay_config(&server, 0));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 0);
let first = pages.next().await.expect("at least one item").expect_err(
"schema requiring non-null first: should surface as a stream error \
when batch_size = 0",
);
let msg = format!("{first}");
assert!(
msg.contains("batch_size = 0") && msg.contains("first"),
"error must explain the batch_size-0 / non-null-first contract; got {msg:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_empty_response_yields_one_empty_page() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_page(0, 0, None)))
.mount(&server)
.await;
let source = GraphqlStream::new(relay_config(&server, 100));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 100);
let mut sizes: Vec<usize> = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(
sizes,
vec![0],
"an empty single-page response must yield exactly one (empty) page"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_max_pages_truncation_does_not_emit_spurious_bookmark() {
let server = MockServer::start().await;
mount_multi_page(&server, 1_000, 100).await;
let config = relay_config(&server, 100).max_pages(1);
let source = GraphqlStream::new(config);
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 100);
let mut page_records: Vec<usize> = Vec::new();
let mut bookmarks: Vec<Option<Value>> = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
page_records.push(page.records.len());
bookmarks.push(page.bookmark);
}
assert_eq!(
page_records,
vec![100],
"max_pages = 1 must truncate after one upstream page"
);
assert!(
bookmarks.iter().all(|b| b.is_none()),
"no replication mode → every bookmark must stay None even on truncation; \
got {bookmarks:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_first_page_arrives_before_full_drain() {
let server = MockServer::start().await;
const TOTAL: u64 = 10;
const PAGE_SIZE: u64 = 1;
const PER_PAGE_DELAY: Duration = Duration::from_millis(80);
let pages_seen = Arc::new(std::sync::atomic::AtomicU64::new(0));
let pages_seen_clone = Arc::clone(&pages_seen);
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
pages_seen_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let cursor = request_cursor(req);
let page_index: u64 = match cursor.as_deref() {
None => 0,
Some(s) => s
.strip_prefix("cursor-")
.and_then(|n| n.parse::<u64>().ok())
.expect("cursor must be `cursor-<N>`"),
};
let start = page_index * PAGE_SIZE;
let remaining = TOTAL.saturating_sub(start);
let this_page = remaining.min(PAGE_SIZE);
let next_cursor = if start + this_page < TOTAL {
Some(format!("cursor-{}", page_index + 1))
} else {
None
};
ResponseTemplate::new(200)
.set_delay(PER_PAGE_DELAY)
.set_body_json(make_page(start, this_page, next_cursor.as_deref()))
})
.mount(&server)
.await;
let source = GraphqlStream::new(relay_config(&server, PAGE_SIZE as usize));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, PAGE_SIZE as usize);
let start = Instant::now();
let first_page = pages
.next()
.await
.expect("first page exists")
.expect("page ok");
let first_elapsed = start.elapsed();
assert_eq!(first_page.records.len(), PAGE_SIZE as usize);
while let Some(page) = pages.next().await {
let _ = page.expect("page ok");
}
let full_elapsed = start.elapsed();
assert!(
first_elapsed * 3 < full_elapsed,
"true streaming should deliver the first page well before the full drain \
completes; first_elapsed = {first_elapsed:?}, full_elapsed = {full_elapsed:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_default_batch_size_drives_first_variable() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
assert_eq!(
request_first(req),
Some(DEFAULT_BATCH_SIZE as u64),
"default config must inject DEFAULT_BATCH_SIZE as the `first:` variable"
);
ResponseTemplate::new(200).set_body_json(make_page(0, 1, None))
})
.mount(&server)
.await;
let source = GraphqlStream::new(relay_config(&server, DEFAULT_BATCH_SIZE));
let ctx: HashMap<String, Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
let page = pages.next().await.expect("one page").expect("page ok");
assert_eq!(page.records.len(), 1);
assert!(pages.next().await.is_none(), "single-page response ends");
}
#[tokio::test(flavor = "multi_thread")]
async fn stuck_cursor_stops_without_extra_duplicate_page() {
use std::sync::atomic::{AtomicUsize, Ordering};
let server = MockServer::start().await;
let hits = Arc::new(AtomicUsize::new(0));
let hits_resp = hits.clone();
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |_req: &Request| {
hits_resp.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(200).set_body_json(make_page(0, 1, Some("stuck")))
})
.mount(&server)
.await;
let source = GraphqlStream::new(relay_config(&server, 10));
let records = source.fetch_all().await.unwrap();
assert_eq!(hits.load(Ordering::SeqCst), 2, "must stop after 2 requests");
assert_eq!(records.len(), 2);
}