pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for HTTP Client to Console pipeline
//!
//! These tests verify real-world scenarios:
//! - API monitoring with data validation
//! - Multi-cycle polling stability
//!
//! Note: Since console output cannot be captured, these tests verify
//! the data flow by checking that the HTTP source successfully fetches
//! data from the mock server. The wiremock expectations serve as proof
//! that the pipeline is operating correctly.

#![cfg(feature = "http-client")]

mod common;

use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

/// Scenario: API Monitoring
///
/// Simulates a typical API polling scenario where we fetch JSON data
/// from a REST endpoint. The wiremock server verifies that the pipeline
/// correctly triggers HTTP requests.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_api_monitoring_pipeline() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_server = MockServer::start().await;

    // Simulate a real API response with nested data
    let api_response = serde_json::json!({
        "status": "healthy",
        "metrics": {
            "cpu_usage": 45.2,
            "memory_mb": 1024,
            "requests_per_sec": 150
        },
        "timestamp": "2026-01-07T12:00:00Z"
    });

    Mock::given(method("GET"))
        .and(path("/api/health"))
        .respond_with(ResponseTemplate::new(200).set_body_json(&api_response))
        .expect(1..) // Must receive at least 1 request
        .mount(&mock_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: health_monitor
      type: http_client
      config:
        url: "{}/api/health"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [health_monitor]
      outputs: [console_out]
  sinks:
    - id: console_out
      type: console
"#,
        mock_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Wait for at least one poll cycle
    wait_for_requests(&mock_server, 1, default_test_timeout()).await;

    // Shutdown gracefully
    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(result.is_ok(), "Engine should shutdown gracefully");
    // wiremock will verify at least 1 request was received on drop
}

/// Scenario: Multi-Cycle Polling Stability
///
/// Verifies that the pipeline remains stable over multiple polling cycles.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multi_cycle_polling_stability() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"poll": "ok"})))
        .expect(3..) // Must receive at least 3 requests
        .mount(&mock_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: poller
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [poller]
      outputs: [console_out]
  sinks:
    - id: console_out
      type: console
"#,
        mock_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Wait for at least 3 poll cycles
    wait_for_requests(&mock_server, 3, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");

    assert!(
        result.is_ok(),
        "Engine should remain stable over multiple cycles"
    );
    // wiremock verifies at least 3 requests were made
}