#![cfg(feature = "http-client")]
mod common;
use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multiple_sources_to_single_blackhole() {
if common::skip_if_no_network() {
return;
}
let mock_server_a = MockServer::start().await;
let mock_server_b = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"source": "A"})))
.expect(1..)
.mount(&mock_server_a)
.await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"source": "B"})))
.expect(1..)
.mount(&mock_server_b)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source_a
type: http_client
config:
url: "{}"
interval: "100ms"
- id: source_b
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source_a, source_b]
outputs: [discard_all]
sinks:
- id: discard_all
type: blackhole
"#,
mock_server_a.uri(),
mock_server_b.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&mock_server_a, 1, default_test_timeout()).await;
wait_for_requests(&mock_server_b, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(
result.is_ok(),
"Multi-source pipeline should run without error"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_blackhole_stability_continuous_load() {
if common::skip_if_no_network() {
return;
}
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"timestamp": "2026-01-07T12:00:00Z",
"value": 42
})))
.expect(10..) .mount(&mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: continuous_source
type: http_client
config:
url: "{}"
interval: "50ms"
transforms:
- id: pass_through
inputs: [continuous_source]
outputs: [blackhole]
sinks:
- id: blackhole
type: blackhole
"#,
mock_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&mock_server, 10, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(
result.is_ok(),
"Blackhole should remain stable under continuous load"
);
}