use faucet_core::observability::Labels;
use faucet_core::{Source, TransformStage, TransformingSource};
use faucet_source_rest::{
Auth, DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO, FaucetError, PaginationStyle, RecordTransform,
ReplicationMethod, ResponseValidator, RestStream, RestStreamConfig,
};
use futures::StreamExt;
use serde_json::json;
use std::collections::HashMap;
use wiremock::matchers::{method, path, query_param};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn test_single_page_fetch() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users")
.records_path("$.data[*]")
.pagination(PaginationStyle::None),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0]["name"], "Alice");
}
#[tokio::test]
async fn test_204_no_content_is_empty_page_not_error() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users")
.records_path("$.data[*]")
.pagination(PaginationStyle::None),
)
.unwrap();
let records = stream
.fetch_all()
.await
.expect("204 must be treated as an empty page, not a JSON error");
assert!(records.is_empty());
}
#[tokio::test]
async fn test_empty_body_200_is_empty_page_not_error() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200)) .mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users")
.records_path("$.data[*]")
.pagination(PaginationStyle::None),
)
.unwrap();
let records = stream
.fetch_all()
.await
.expect("empty 200 body must be treated as an empty page");
assert!(records.is_empty());
}
#[tokio::test]
async fn test_malformed_nonempty_body_still_errors() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200).set_body_string("this is not json"))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users")
.records_path("$.data[*]")
.pagination(PaginationStyle::None),
)
.unwrap();
assert!(
stream.fetch_all().await.is_err(),
"a non-empty, non-JSON body must surface as an error"
);
}
#[tokio::test]
async fn test_cursor_pagination() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 1}, {"id": 2}],
"next_cursor": "page2"
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 3}],
"next_cursor": null
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
}),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 3);
}
#[tokio::test]
async fn test_typed_deserialization() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
]
})))
.mount(&server)
.await;
#[derive(Debug, serde::Deserialize, PartialEq)]
struct User {
id: u64,
name: String,
email: String,
}
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users").records_path("$.data[*]"),
)
.unwrap();
let users: Vec<User> = stream.fetch_all_as().await.unwrap();
assert_eq!(users.len(), 1);
assert_eq!(users[0].name, "Alice");
}
#[tokio::test]
async fn test_link_header_pagination() {
let server = MockServer::start().await;
let page2_url = format!("{}/api/items?page=2", server.uri());
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({"items": [{"id": 1}, {"id": 2}]}))
.append_header("link", format!(r#"<{page2_url}>; rel="next""#).as_str()),
)
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("page", "2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"items": [{"id": 3}]})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.pagination(PaginationStyle::LinkHeader),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0]["id"], 1);
assert_eq!(records[2]["id"], 3);
}
#[tokio::test]
async fn test_next_link_in_body_pagination() {
let server = MockServer::start().await;
let page2_url = format!("{}/api/workers?page=2", server.uri());
Mock::given(method("GET"))
.and(path("/api/workers"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [{"id": 1}, {"id": 2}],
"next_link": page2_url,
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/workers"))
.and(query_param("page", "2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [{"id": 3}],
"next_link": null,
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/workers")
.records_path("$.results[*]")
.pagination(PaginationStyle::NextLinkInBody {
next_link_path: "$.next_link".into(),
}),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0]["id"], 1);
assert_eq!(records[2]["id"], 3);
}
#[tokio::test]
async fn test_max_pages_enforced_for_cursor_pagination() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 1}],
"next_cursor": "page2"
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 2}],
"next_cursor": "page3"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page3"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 3}],
"next_cursor": "page4"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page4"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 4}],
"next_cursor": null
})))
.expect(0)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
})
.max_pages(3),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 3);
}
#[tokio::test]
async fn test_bearer_auth_sent() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/secure"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"data": []})))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/secure")
.auth(Auth::Bearer {
token: "my-secret-token".into(),
})
.records_path("$.data[*]"),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert!(records.is_empty());
}
#[tokio::test]
async fn test_stream_pages_yields_per_page() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 1}, {"id": 2}],
"next_cursor": "page2"
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 3}],
"next_cursor": null
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
}),
)
.unwrap();
let mut pages = stream.stream_pages();
let page1 = pages.next().await.unwrap().unwrap();
assert_eq!(page1.len(), 2);
assert_eq!(page1[0]["id"], 1);
let page2 = pages.next().await.unwrap().unwrap();
assert_eq!(page2.len(), 1);
assert_eq!(page2[0]["id"], 3);
assert!(pages.next().await.is_none());
}
#[tokio::test]
async fn test_incremental_replication_filters_old_records() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/events"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [
{"id": 1, "updated_at": "2024-01-01"},
{"id": 2, "updated_at": "2024-06-01"},
{"id": 3, "updated_at": "2024-12-01"},
]
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/events")
.records_path("$.items[*]")
.replication_method(ReplicationMethod::Incremental)
.replication_key("updated_at")
.start_replication_value(json!("2024-06-01")),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0]["id"], 3);
}
#[tokio::test]
async fn test_fetch_all_incremental_returns_bookmark() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/events"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [
{"id": 1, "updated_at": "2024-06-15"},
{"id": 2, "updated_at": "2024-11-30"},
{"id": 3, "updated_at": "2024-08-01"},
]
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/events")
.records_path("$.items[*]")
.replication_key("updated_at"),
)
.unwrap();
let (records, bookmark) = stream.fetch_all_incremental().await.unwrap();
assert_eq!(records.len(), 3);
assert_eq!(bookmark.unwrap(), json!("2024-11-30"));
}
#[tokio::test]
async fn test_full_table_mode_does_not_filter() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/events"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [
{"id": 1, "updated_at": "2023-01-01"},
{"id": 2, "updated_at": "2024-01-01"},
]
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/events")
.records_path("$.items[*]")
.replication_method(ReplicationMethod::FullTable)
.replication_key("updated_at")
.start_replication_value(json!("2023-06-01")),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 2);
}
#[tokio::test]
async fn test_partitions_fetch_each_context() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/orgs/acme/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"users": [{"id": 1, "org": "acme"}]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/orgs/beta/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"users": [{"id": 2, "org": "beta"}, {"id": 3, "org": "beta"}]
})))
.mount(&server)
.await;
let mut p1 = HashMap::new();
p1.insert("org_id".to_string(), json!("acme"));
let mut p2 = HashMap::new();
p2.insert("org_id".to_string(), json!("beta"));
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/orgs/{org_id}/users")
.records_path("$.users[*]")
.add_partition(p1)
.add_partition(p2),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0]["org"], "acme");
assert_eq!(records[1]["org"], "beta");
}
#[tokio::test]
async fn test_429_retries_after_header_delay() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(429).append_header("retry-after", "1"))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"items": [{"id": 1}]})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.max_retries(3),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 1);
}
#[tokio::test]
async fn test_tolerated_http_error_returns_empty_page() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/missing"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/missing").tolerate_http_error(404),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert!(records.is_empty());
}
#[tokio::test]
async fn test_untolerated_http_error_propagates() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/missing"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let stream =
RestStream::new(RestStreamConfig::new(&server.uri(), "/api/missing").max_retries(0))
.unwrap();
assert!(stream.fetch_all().await.is_err());
}
#[tokio::test]
async fn test_tolerated_error_midpagination_does_not_silently_truncate() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 1}],
"next_cursor": "page2"
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page2"))
.respond_with(ResponseTemplate::new(500))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
})
.tolerate_http_error(500)
.max_retries(0),
)
.unwrap();
let result = stream.fetch_all().await;
assert!(
result.is_err(),
"a tolerated error mid-pagination must not silently truncate; got {result:?}"
);
}
#[test]
fn test_metadata_fields_builder() {
let cfg = RestStreamConfig::new("https://api.example.com", "/users")
.name("users")
.primary_keys(vec!["id".to_string()])
.schema(json!({
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"}
}
}));
assert_eq!(cfg.name.as_deref(), Some("users"));
assert_eq!(cfg.primary_keys, vec!["id"]);
assert!(cfg.schema.is_some());
}
#[tokio::test]
async fn test_infer_schema_from_api_response() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [
{"id": 1, "name": "Alice", "email": "alice@example.com", "score": 9.5},
{"id": 2, "name": "Bob", "score": 8.0},
]
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users").records_path("$.data[*]"),
)
.unwrap();
let schema = stream.infer_schema().await.unwrap();
assert_eq!(schema["type"], "object");
let props = &schema["properties"];
assert_eq!(props["id"]["type"], "integer");
assert_eq!(props["name"]["type"], "string");
assert_eq!(props["score"]["type"], "number");
let email_type = &props["email"]["type"];
assert!(
email_type == &json!(["null", "string"]) || email_type == &json!(["string", "null"]),
"expected nullable string for email, got {email_type}"
);
}
#[tokio::test]
async fn test_infer_schema_returns_existing_schema_without_request() {
let explicit_schema = json!({
"type": "object",
"properties": {"id": {"type": "integer"}}
});
let stream = RestStream::new(
RestStreamConfig::new("http://localhost:19999", "/api/never-called")
.schema(explicit_schema.clone()),
)
.unwrap();
let result = stream.infer_schema().await.unwrap();
assert_eq!(result, explicit_schema);
}
#[tokio::test]
async fn test_infer_schema_sample_size_limits_requests() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [
{"id": 1, "updated_at": "2024-01-01"},
{"id": 2, "updated_at": "2024-02-01"},
{"id": 3, "updated_at": "2024-03-01"},
],
"next_cursor": "page2"
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 4, "updated_at": "2024-04-01"}],
"next_cursor": null
})))
.expect(0) .mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.items[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
})
.schema_sample_size(2),
)
.unwrap();
let schema = stream.infer_schema().await.unwrap();
assert_eq!(schema["type"], "object");
assert_eq!(schema["properties"]["id"]["type"], "integer");
}
#[cfg(feature = "transform-flatten")]
#[tokio::test]
async fn test_flatten_transform_applied_to_records() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [
{"id": 1, "address": {"city": "NYC", "zip": "10001"}},
{"id": 2, "address": {"city": "LA", "zip": "90001"}},
]
})))
.mount(&server)
.await;
let inner: Box<dyn Source> = Box::new(
RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users").records_path("$.data[*]"),
)
.unwrap(),
);
let stream = TransformingSource::new(
inner,
vec![TransformStage::Map(RecordTransform::Flatten {
separator: "__".into(),
})],
Labels::for_named("rest"),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0]["id"], 1);
assert_eq!(records[0]["address__city"], "NYC");
assert_eq!(records[0]["address__zip"], "10001");
assert!(
records[0].get("address").is_none(),
"nested key should be gone"
);
}
#[cfg(feature = "transform-keys-case")]
#[tokio::test]
async fn test_keys_case_snake_transform() {
use faucet_core::KeyCaseMode;
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"First Name": "Alice", "Last Name": "Smith", "Price USD": 9.99}]
})))
.mount(&server)
.await;
let inner: Box<dyn Source> = Box::new(
RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/users").records_path("$.data[*]"),
)
.unwrap(),
);
let stream = TransformingSource::new(
inner,
vec![TransformStage::Map(RecordTransform::KeysCase {
mode: KeyCaseMode::Snake,
})],
Labels::for_named("rest"),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records[0]["first_name"], "Alice");
assert_eq!(records[0]["last_name"], "Smith");
assert_eq!(records[0]["price_usd"], 9.99);
}
#[cfg(feature = "transform-rename-keys")]
#[tokio::test]
async fn test_rename_keys_transform() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/events"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"_sdc_id": 1, "_sdc_name": "event_one"}]
})))
.mount(&server)
.await;
let inner: Box<dyn Source> = Box::new(
RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/events").records_path("$.data[*]"),
)
.unwrap(),
);
let stream = TransformingSource::new(
inner,
vec![TransformStage::Map(RecordTransform::RenameKeys {
pattern: r"^_sdc_".into(),
replacement: "".into(),
})],
Labels::for_named("rest"),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records[0]["id"], 1);
assert_eq!(records[0]["name"], "event_one");
}
#[cfg(all(feature = "transform-keys-case", feature = "transform-flatten"))]
#[tokio::test]
async fn test_chained_transforms_keys_case_then_flatten() {
use faucet_core::KeyCaseMode;
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/data"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"User Info": {"First Name": "Alice"}}]
})))
.mount(&server)
.await;
let inner: Box<dyn Source> = Box::new(
RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/data").records_path("$.data[*]"),
)
.unwrap(),
);
let stream = TransformingSource::new(
inner,
vec![
TransformStage::Map(RecordTransform::KeysCase {
mode: KeyCaseMode::Snake,
}),
TransformStage::Map(RecordTransform::Flatten {
separator: "_".into(),
}),
],
Labels::for_named("rest"),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records[0]["user_info_first_name"], "Alice");
}
#[cfg(feature = "transform-rename-keys")]
#[test]
fn test_invalid_regex_errors_at_construction() {
let inner: Box<dyn Source> =
Box::new(RestStream::new(RestStreamConfig::new("http://localhost", "/api")).unwrap());
let result = TransformingSource::new(
inner,
vec![TransformStage::Map(RecordTransform::RenameKeys {
pattern: "[invalid".into(),
replacement: "".into(),
})],
Labels::for_named("rest"),
);
assert!(result.is_err());
assert!(matches!(
result,
Err(faucet_source_rest::FaucetError::Transform(_))
));
}
#[tokio::test]
async fn test_custom_transform_applied_to_records() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"id": 1, "value": 10}, {"id": 2, "value": 20}]
})))
.mount(&server)
.await;
let inner: Box<dyn Source> = Box::new(
RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items").records_path("$.data[*]"),
)
.unwrap(),
);
let stream = TransformingSource::new(
inner,
vec![TransformStage::Map(RecordTransform::custom(
|mut record| {
if let serde_json::Value::Object(ref mut m) = record {
if let Some(v) = m.get("value").and_then(|v| v.as_i64()) {
m.insert("value".to_string(), json!(v * 2));
}
m.insert("_source".to_string(), json!("test-api"));
}
record
},
))],
Labels::for_named("rest"),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records[0]["value"], 20);
assert_eq!(records[1]["value"], 40);
assert_eq!(records[0]["_source"], "test-api");
assert_eq!(records[1]["_source"], "test-api");
}
#[tokio::test]
async fn test_api_key_query_sent_as_param() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("api_key", "my-secret"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"id": 1}]
})))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.data[*]")
.auth(Auth::ApiKeyQuery {
param: "api_key".into(),
value: "my-secret".into(),
}),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 1);
}
#[tokio::test]
async fn test_http_error_includes_response_body() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/fail"))
.respond_with(
ResponseTemplate::new(422)
.set_body_string(r#"{"error": "validation failed", "field": "email"}"#),
)
.mount(&server)
.await;
let stream =
RestStream::new(RestStreamConfig::new(&server.uri(), "/api/fail").max_retries(0)).unwrap();
let err = stream.fetch_all().await.unwrap_err();
match &err {
FaucetError::HttpStatus { status, body, url } => {
assert_eq!(*status, 422);
assert!(body.contains("validation failed"));
assert!(url.contains("/api/fail"));
}
other => panic!("expected HttpStatus, got: {other:?}"),
}
}
#[tokio::test]
async fn test_5xx_retries_then_succeeds() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/flaky"))
.respond_with(ResponseTemplate::new(500).set_body_string("server error"))
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/flaky"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"data": [{"id": 1}]})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/flaky")
.records_path("$.data[*]")
.max_retries(3)
.retry_backoff(std::time::Duration::from_millis(1)),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 1);
}
#[tokio::test]
async fn test_4xx_does_not_retry() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/bad"))
.respond_with(ResponseTemplate::new(400).set_body_string("bad request"))
.expect(1) .mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/bad")
.max_retries(3)
.retry_backoff(std::time::Duration::from_millis(1)),
)
.unwrap();
assert!(stream.fetch_all().await.is_err());
}
#[tokio::test]
async fn test_cursor_loop_detection_stops_fetching() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/stuck"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": 1}],
"cursor": "same-token"
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/stuck")
.records_path("$.items[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.cursor".into(),
param_name: "cursor".into(),
})
.max_pages(100), )
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 2);
}
#[tokio::test]
async fn test_token_endpoint_auth_fetches_and_uses_token() {
use reqwest::header::HeaderMap;
use wiremock::matchers::header;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/auth/token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"access_token": "fetched-secret-token",
"expires_in": 3600
})))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/data"))
.and(header("authorization", "Bearer fetched-secret-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 1}, {"id": 2}])))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(RestStreamConfig::new(&server.uri(), "/api/data").auth(
Auth::TokenEndpoint {
url: format!("{}/auth/token", server.uri()),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
body: None,
token_path: "$.access_token".into(),
expiry_path: Some("$.expires_in".into()),
expiry_ratio: DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO,
response_validator: None,
},
))
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0]["id"], 1);
}
#[tokio::test]
async fn test_token_endpoint_auth_with_custom_headers_and_body() {
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use wiremock::matchers::header;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/auth/login"))
.and(header("x-api-key", "setup-key"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result": {
"token": "dynamic-bearer-value"
}
})))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(header("authorization", "Bearer dynamic-bearer-value"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"name": "item1"}])))
.expect(1)
.mount(&server)
.await;
let mut token_headers = HeaderMap::new();
token_headers.insert(
HeaderName::from_static("x-api-key"),
HeaderValue::from_static("setup-key"),
);
let stream = RestStream::new(RestStreamConfig::new(&server.uri(), "/api/items").auth(
Auth::TokenEndpoint {
url: format!("{}/auth/login", server.uri()),
method: reqwest::Method::POST,
headers: token_headers,
body: Some(json!({"username": "admin", "password": "secret"})),
token_path: "$.result.token".into(),
expiry_path: None,
expiry_ratio: DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO,
response_validator: None,
},
))
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0]["name"], "item1");
}
#[tokio::test]
async fn test_token_endpoint_auth_caches_token_across_pages() {
use reqwest::header::HeaderMap;
use wiremock::matchers::header;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/auth/token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"token": "cached-token",
"ttl": 3600
})))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(header("authorization", "Bearer cached-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"id": 1}],
"next_cursor": "page2"
})))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("cursor", "page2"))
.and(header("authorization", "Bearer cached-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [{"id": 2}],
"next_cursor": null
})))
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.records_path("$.data[*]")
.pagination(PaginationStyle::Cursor {
next_token_path: "$.next_cursor".into(),
param_name: "cursor".into(),
})
.auth(Auth::TokenEndpoint {
url: format!("{}/auth/token", server.uri()),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
body: None,
token_path: "$.token".into(),
expiry_path: Some("$.ttl".into()),
expiry_ratio: DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO,
response_validator: None,
}),
)
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 2);
}
#[tokio::test]
async fn test_token_endpoint_auth_error_on_failed_fetch() {
use reqwest::header::HeaderMap;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/auth/token"))
.respond_with(ResponseTemplate::new(401).set_body_string("Unauthorized"))
.mount(&server)
.await;
let stream = RestStream::new(RestStreamConfig::new(&server.uri(), "/api/data").auth(
Auth::TokenEndpoint {
url: format!("{}/auth/token", server.uri()),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
body: None,
token_path: "$.token".into(),
expiry_path: None,
expiry_ratio: DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO,
response_validator: None,
},
))
.unwrap();
let err = stream.fetch_all().await.unwrap_err();
match err {
FaucetError::Auth(msg) => {
assert!(msg.contains("401"), "expected 401 in error: {msg}");
}
other => panic!("expected Auth error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_token_endpoint_custom_response_validator() {
use reqwest::header::HeaderMap;
use wiremock::matchers::header;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/auth/token"))
.respond_with(ResponseTemplate::new(202).set_body_json(json!({"token": "accepted-token"})))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/data"))
.and(header("authorization", "Bearer accepted-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 1}])))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(RestStreamConfig::new(&server.uri(), "/api/data").auth(
Auth::TokenEndpoint {
url: format!("{}/auth/token", server.uri()),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
body: None,
token_path: "$.token".into(),
expiry_path: None,
expiry_ratio: DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO,
response_validator: Some(ResponseValidator::new(|status| {
status == 200 || status == 202
})),
},
))
.unwrap();
let records = stream.fetch_all().await.unwrap();
assert_eq!(records.len(), 1);
}
#[tokio::test]
async fn test_token_endpoint_custom_validator_rejects_response() {
use reqwest::header::HeaderMap;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/auth/token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"token": "t"})))
.mount(&server)
.await;
let stream = RestStream::new(RestStreamConfig::new(&server.uri(), "/api/data").auth(
Auth::TokenEndpoint {
url: format!("{}/auth/token", server.uri()),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
body: None,
token_path: "$.token".into(),
expiry_path: None,
expiry_ratio: DEFAULT_TOKEN_ENDPOINT_EXPIRY_RATIO,
response_validator: Some(ResponseValidator::new(|status| status == 201)),
},
))
.unwrap();
let err = stream.fetch_all().await.unwrap_err();
match err {
FaucetError::Auth(msg) => {
assert!(msg.contains("200"), "expected 200 in error: {msg}");
}
other => panic!("expected Auth error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_fetch_with_context_substitutes_path() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/orgs/acme/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 1, "name": "Alice"}])))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/orgs/{org_id}/users")
.pagination(PaginationStyle::None),
)
.unwrap();
let mut context = HashMap::new();
context.insert("org_id".to_string(), json!("acme"));
let records = stream.fetch_with_context(&context).await.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0]["name"], "Alice");
}
#[tokio::test]
async fn test_fetch_with_context_substitutes_query_params() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.and(query_param("org", "acme"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 1}])))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items")
.query("org", "{org_id}")
.pagination(PaginationStyle::None),
)
.unwrap();
let mut context = HashMap::new();
context.insert("org_id".to_string(), json!("acme"));
let records = stream.fetch_with_context(&context).await.unwrap();
assert_eq!(records.len(), 1);
}
#[tokio::test]
async fn test_fetch_with_context_merges_with_partitions() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/orgs/acme/repos/alpha/issues"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 1}])))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/orgs/acme/repos/beta/issues"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 2}])))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/orgs/{org_id}/repos/{repo}/issues")
.pagination(PaginationStyle::None)
.add_partition({
let mut p = HashMap::new();
p.insert("repo".to_string(), json!("alpha"));
p
})
.add_partition({
let mut p = HashMap::new();
p.insert("repo".to_string(), json!("beta"));
p
}),
)
.unwrap();
let mut context = HashMap::new();
context.insert("org_id".to_string(), json!("acme"));
let records = stream.fetch_with_context(&context).await.unwrap();
assert_eq!(records.len(), 2);
}
#[tokio::test]
async fn test_fetch_with_empty_context_uses_fetch_all() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!([{"id": 1}])))
.expect(1)
.mount(&server)
.await;
let stream = RestStream::new(
RestStreamConfig::new(&server.uri(), "/api/items").pagination(PaginationStyle::None),
)
.unwrap();
let records = stream.fetch_with_context(&HashMap::new()).await.unwrap();
assert_eq!(records.len(), 1);
}