pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for HTTP Client to Blackhole pipeline
//!
//! These tests verify blackhole-specific scenarios:
//! - Multiple sources converging to single blackhole
//! - Long-running stability without memory accumulation
//!
//! The wiremock server expectations verify that data flows through
//! the pipeline by confirming HTTP requests are made.

#![cfg(feature = "http-client")]

mod common;

use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

/// Scenario: Multiple Sources to Single Blackhole
///
/// Verifies that multiple HTTP sources can converge to a single blackhole
/// sink, simulating a fan-in pattern for discarding.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multiple_sources_to_single_blackhole() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_server_a = MockServer::start().await;
    let mock_server_b = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"source": "A"})))
        .expect(1..)
        .mount(&mock_server_a)
        .await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"source": "B"})))
        .expect(1..)
        .mount(&mock_server_b)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source_a
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
    - id: source_b
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source_a, source_b]
      outputs: [discard_all]
  sinks:
    - id: discard_all
      type: blackhole
"#,
        mock_server_a.uri(),
        mock_server_b.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Wait enough for both sources to make requests
    wait_for_requests(&mock_server_a, 1, default_test_timeout()).await;
    wait_for_requests(&mock_server_b, 1, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(
        result.is_ok(),
        "Multi-source pipeline should run without error"
    );
    // wiremock will verify both servers received requests on drop
}

/// Scenario: Blackhole Stability Under Continuous Load
///
/// Verifies that the blackhole sink remains stable over extended operation
/// without accumulating memory or causing errors.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_blackhole_stability_continuous_load() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "timestamp": "2026-01-07T12:00:00Z",
            "value": 42
        })))
        .expect(10..) // Must handle at least 10 requests
        .mount(&mock_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: continuous_source
      type: http_client
      config:
        url: "{}"
        interval: "50ms"
  transforms:
    - id: pass_through
      inputs: [continuous_source]
      outputs: [blackhole]
  sinks:
    - id: blackhole
      type: blackhole
"#,
        mock_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Run until we have 10 requests
    wait_for_requests(&mock_server, 10, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    // Should complete successfully without any accumulated state
    assert!(
        result.is_ok(),
        "Blackhole should remain stable under continuous load"
    );
    // wiremock will verify at least 10 requests were made
}