#![cfg(feature = "http-client")]
mod common;
use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{body_json, header, method, path};
use wiremock::{Match, Mock, MockServer, Request, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_basic_post() {
if common::skip_if_no_network() {
return;
}
let source_server = MockServer::start().await;
let sink_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/data"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"event": "test",
"value": 42
})))
.expect(1..)
.mount(&source_server)
.await;
Mock::given(method("POST"))
.and(path("/webhook"))
.and(body_json(serde_json::json!({
"event": "test",
"value": 42
})))
.respond_with(ResponseTemplate::new(200))
.expect(1..)
.mount(&sink_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}/data"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [http_sink]
sinks:
- id: http_sink
type: http_client
config:
url: "{}/webhook"
method: POST
"#,
source_server.uri(),
sink_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&sink_server, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run without error");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_with_field_mappings() {
if common::skip_if_no_network() {
return;
}
let source_server = MockServer::start().await;
let sink_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/users"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"data": {
"user": {
"id": 123,
"name": "Alice"
}
}
})))
.expect(1..)
.mount(&source_server)
.await;
Mock::given(method("POST"))
.and(path("/events"))
.and(BodyContains::new("user_id", serde_json::json!(123)))
.and(BodyContains::new("user_name", serde_json::json!("Alice")))
.and(BodyContains::new(
"event_type",
serde_json::json!("user_update"),
))
.respond_with(ResponseTemplate::new(200))
.expect(1..)
.mount(&sink_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}/users"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [http_sink]
sinks:
- id: http_sink
type: http_client
config:
url: "{}/events"
fields:
- name: user_id
from: "$.data.user.id"
- name: user_name
from: "$.data.user.name"
- name: event_type
value: "user_update"
"#,
source_server.uri(),
sink_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&sink_server, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run without error");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_with_headers() {
if common::skip_if_no_network() {
return;
}
let source_server = MockServer::start().await;
let sink_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})))
.expect(1..)
.mount(&source_server)
.await;
Mock::given(method("POST"))
.and(path("/api"))
.and(header("Authorization", "Bearer test-token"))
.and(header("X-Custom-Header", "custom-value"))
.respond_with(ResponseTemplate::new(200))
.expect(1..)
.mount(&sink_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [http_sink]
sinks:
- id: http_sink
type: http_client
config:
url: "{}/api"
headers:
Authorization: "Bearer test-token"
X-Custom-Header: "custom-value"
"#,
source_server.uri(),
sink_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&sink_server, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run without error");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_put_method() {
if common::skip_if_no_network() {
return;
}
let source_server = MockServer::start().await;
let sink_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"id": 1})))
.expect(1..)
.mount(&source_server)
.await;
Mock::given(method("PUT"))
.and(path("/resource"))
.respond_with(ResponseTemplate::new(200))
.expect(1..)
.mount(&sink_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [http_sink]
sinks:
- id: http_sink
type: http_client
config:
url: "{}/resource"
method: PUT
"#,
source_server.uri(),
sink_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&sink_server, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run without error");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_error_handling() {
if common::skip_if_no_network() {
return;
}
let source_server = MockServer::start().await;
let sink_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"data": "test"})))
.expect(3..)
.mount(&source_server)
.await;
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
.up_to_n_times(2)
.mount(&sink_server)
.await;
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(ResponseTemplate::new(200))
.expect(1..)
.mount(&sink_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [http_sink]
sinks:
- id: http_sink
type: http_client
config:
url: "{}/webhook"
"#,
source_server.uri(),
sink_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&sink_server, 3, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(
result.is_ok(),
"Pipeline should complete despite sink errors"
);
}
struct BodyContains {
key: String,
value: serde_json::Value,
}
impl BodyContains {
fn new(key: impl Into<String>, value: serde_json::Value) -> Self {
Self {
key: key.into(),
value,
}
}
}
impl Match for BodyContains {
fn matches(&self, request: &Request) -> bool {
if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&request.body) {
body.get(&self.key) == Some(&self.value)
} else {
false
}
}
}