#![cfg(feature = "http-client")]
mod common;
use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_authenticated_pipeline() {
if common::skip_if_no_network() {
return;
}
let mock_source = MockServer::start().await;
let mock_sink = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/source-data"))
.and(header("Authorization", "Basic dXNlcjpwYXNz")) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"data": "secure-content"
})))
.expect(1..)
.mount(&mock_source)
.await;
Mock::given(method("POST"))
.and(path("/sink-endpoint"))
.and(header("Authorization", "Bearer sink-token-123"))
.respond_with(ResponseTemplate::new(200))
.expect(1..)
.mount(&mock_sink)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: auth_source
type: http_client
config:
url: "{}/source-data"
interval: "100ms"
auth:
type: basic
username: "user"
password: "pass"
transforms:
- id: pass_through
inputs: [auth_source]
outputs: [auth_sink]
sinks:
- id: auth_sink
type: http_client
config:
url: "{}/sink-endpoint"
method: "POST"
auth:
type: bearer
token: "sink-token-123"
"#,
mock_source.uri(),
mock_sink.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&mock_source, 1, default_test_timeout()).await;
wait_for_requests(&mock_sink, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run without error");
}