pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for HTTP Client Authentication
//!
//! Verifies that authentication configurations are correctly applied
//! in a full pipeline execution.

#![cfg(feature = "http-client")]

mod common;

use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_authenticated_pipeline() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_source = MockServer::start().await;
    let mock_sink = MockServer::start().await;

    // 1. Setup Source Mock (Expects Basic Auth)
    Mock::given(method("GET"))
        .and(path("/source-data"))
        .and(header("Authorization", "Basic dXNlcjpwYXNz")) // user:pass
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "data": "secure-content"
        })))
        .expect(1..)
        .mount(&mock_source)
        .await;

    // 2. Setup Sink Mock (Expects Bearer Token)
    Mock::given(method("POST"))
        .and(path("/sink-endpoint"))
        .and(header("Authorization", "Bearer sink-token-123"))
        .respond_with(ResponseTemplate::new(200))
        .expect(1..)
        .mount(&mock_sink)
        .await;

    // 3. Define Pipeline Config
    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: auth_source
      type: http_client
      config:
        url: "{}/source-data"
        interval: "100ms"
        auth:
          type: basic
          username: "user"
          password: "pass"

  transforms:
    - id: pass_through
      inputs: [auth_source]
      outputs: [auth_sink]

  sinks:
    - id: auth_sink
      type: http_client
      config:
        url: "{}/sink-endpoint"
        method: "POST"
        auth:
          type: bearer
          token: "sink-token-123"
"#,
        mock_source.uri(),
        mock_sink.uri()
    );

    // 4. Run Engine
    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // 5. Wait for data flow
    // Wait for source to be polled and sink to receive data
    wait_for_requests(&mock_source, 1, default_test_timeout()).await;
    wait_for_requests(&mock_sink, 1, default_test_timeout()).await;

    // 6. Shutdown
    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(result.is_ok(), "Engine should run without error");
}