pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for HTTP Client Sink
//!
//! These tests verify HTTP sink functionality:
//! - Basic POST/PUT request sending
//! - Field mapping with JSONPath extraction
//! - Header passthrough
//! - Error handling for non-2xx responses
//!
//! Uses wiremock for mock HTTP server to verify requests.

#![cfg(feature = "http-client")]

mod common;

use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{body_json, header, method, path};
use wiremock::{Match, Mock, MockServer, Request, ResponseTemplate};

/// Scenario: Basic HTTP Sink POST
///
/// Verifies that the HTTP sink sends POST requests with the message payload.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_basic_post() {
    if common::skip_if_no_network() {
        return;
    }

    let source_server = MockServer::start().await;
    let sink_server = MockServer::start().await;

    // Source returns data
    Mock::given(method("GET"))
        .and(path("/data"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "event": "test",
            "value": 42
        })))
        .expect(1..)
        .mount(&source_server)
        .await;

    // Sink receives the payload
    Mock::given(method("POST"))
        .and(path("/webhook"))
        .and(body_json(serde_json::json!({
            "event": "test",
            "value": 42
        })))
        .respond_with(ResponseTemplate::new(200))
        .expect(1..)
        .mount(&sink_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}/data"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [http_sink]
  sinks:
    - id: http_sink
      type: http_client
      config:
        url: "{}/webhook"
        method: POST
"#,
        source_server.uri(),
        sink_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Wait for sink to receive at least one request
    wait_for_requests(&sink_server, 1, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(result.is_ok(), "Engine should run without error");
}

/// Scenario: HTTP Sink with Field Mappings
///
/// Verifies that field mappings correctly transform the request body.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_with_field_mappings() {
    if common::skip_if_no_network() {
        return;
    }

    let source_server = MockServer::start().await;
    let sink_server = MockServer::start().await;

    // Source returns nested data
    Mock::given(method("GET"))
        .and(path("/users"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "data": {
                "user": {
                    "id": 123,
                    "name": "Alice"
                }
            }
        })))
        .expect(1..)
        .mount(&source_server)
        .await;

    // Sink expects transformed payload with mapped fields
    Mock::given(method("POST"))
        .and(path("/events"))
        .and(BodyContains::new("user_id", serde_json::json!(123)))
        .and(BodyContains::new("user_name", serde_json::json!("Alice")))
        .and(BodyContains::new(
            "event_type",
            serde_json::json!("user_update"),
        ))
        .respond_with(ResponseTemplate::new(200))
        .expect(1..)
        .mount(&sink_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}/users"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [http_sink]
  sinks:
    - id: http_sink
      type: http_client
      config:
        url: "{}/events"
        fields:
          - name: user_id
            from: "$.data.user.id"
          - name: user_name
            from: "$.data.user.name"
          - name: event_type
            value: "user_update"
"#,
        source_server.uri(),
        sink_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    wait_for_requests(&sink_server, 1, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(result.is_ok(), "Engine should run without error");
}

/// Scenario: HTTP Sink with Custom Headers
///
/// Verifies that custom headers are sent with requests.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_with_headers() {
    if common::skip_if_no_network() {
        return;
    }

    let source_server = MockServer::start().await;
    let sink_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})))
        .expect(1..)
        .mount(&source_server)
        .await;

    Mock::given(method("POST"))
        .and(path("/api"))
        .and(header("Authorization", "Bearer test-token"))
        .and(header("X-Custom-Header", "custom-value"))
        .respond_with(ResponseTemplate::new(200))
        .expect(1..)
        .mount(&sink_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [http_sink]
  sinks:
    - id: http_sink
      type: http_client
      config:
        url: "{}/api"
        headers:
          Authorization: "Bearer test-token"
          X-Custom-Header: "custom-value"
"#,
        source_server.uri(),
        sink_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    wait_for_requests(&sink_server, 1, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(result.is_ok(), "Engine should run without error");
}

/// Scenario: HTTP Sink PUT Method
///
/// Verifies that PUT method works correctly.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_put_method() {
    if common::skip_if_no_network() {
        return;
    }

    let source_server = MockServer::start().await;
    let sink_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"id": 1})))
        .expect(1..)
        .mount(&source_server)
        .await;

    Mock::given(method("PUT"))
        .and(path("/resource"))
        .respond_with(ResponseTemplate::new(200))
        .expect(1..)
        .mount(&sink_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [http_sink]
  sinks:
    - id: http_sink
      type: http_client
      config:
        url: "{}/resource"
        method: PUT
"#,
        source_server.uri(),
        sink_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    wait_for_requests(&sink_server, 1, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(result.is_ok(), "Engine should run without error");
}

/// Scenario: HTTP Sink Error Handling
///
/// Verifies that sink errors don't crash the pipeline and messages continue to flow.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_sink_error_handling() {
    if common::skip_if_no_network() {
        return;
    }

    let source_server = MockServer::start().await;
    let sink_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"data": "test"})))
        .expect(3..)
        .mount(&source_server)
        .await;

    // First requests fail, then succeed
    Mock::given(method("POST"))
        .and(path("/webhook"))
        .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
        .up_to_n_times(2)
        .mount(&sink_server)
        .await;

    Mock::given(method("POST"))
        .and(path("/webhook"))
        .respond_with(ResponseTemplate::new(200))
        .expect(1..)
        .mount(&sink_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [http_sink]
  sinks:
    - id: http_sink
      type: http_client
      config:
        url: "{}/webhook"
"#,
        source_server.uri(),
        sink_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Wait for at least 3 requests (2 failures + 1 success)
    wait_for_requests(&sink_server, 3, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    // Pipeline should complete even with errors (errors go to DLQ)
    assert!(
        result.is_ok(),
        "Pipeline should complete despite sink errors"
    );
}

// Helper matcher for checking specific fields in JSON body
struct BodyContains {
    key: String,
    value: serde_json::Value,
}

impl BodyContains {
    fn new(key: impl Into<String>, value: serde_json::Value) -> Self {
        Self {
            key: key.into(),
            value,
        }
    }
}

impl Match for BodyContains {
    fn matches(&self, request: &Request) -> bool {
        if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&request.body) {
            body.get(&self.key) == Some(&self.value)
        } else {
            false
        }
    }
}