pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Tests for error handling in the pipeline
//!
//! Verifies behavior when sources encounter errors (e.g., HTTP 500/404).
//! The pipeline should mostly likely continue running or retry, depending on policy,
//! but definitely shouldn't panic.

#![cfg(feature = "http-client")]

mod common;

use common::run_engine;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

/// Scenario: HTTP Source returns 500 Internal Server Error
///
/// The engine currently logs the error and continues.
/// We verify it doesn't crash and can recover or at least keep running.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(feature = "file")]
async fn test_http_source_500_error() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_server = MockServer::start().await;
    let temp_dir = tempfile::tempdir().unwrap();
    let output_path = temp_dir.path().join("output.jsonl");

    // Mock server returns 500 Error
    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(500))
        .expect(1..)
        .mount(&mock_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [sink]
  sinks:
    - id: sink
      type: file
      config:
        path: "{}"
"#,
        mock_server.uri(),
        output_path.display()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    // Let it run for a bit to encounter errors
    tokio::time::sleep(std::time::Duration::from_millis(500)).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");
    assert!(
        result.is_ok(),
        "Engine should gracefully handle HTTP errors"
    );

    // Output file should be empty because no valid data was received
    // but the key check is that the engine didn't panic.
    if output_path.exists() {
        let content = std::fs::read_to_string(&output_path).unwrap();
        assert!(content.is_empty(), "File should be empty on source error");
    }
}

/// Scenario: HTTP Source returns 404 Not Found
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_source_404_error() {
    if common::skip_if_no_network() {
        return;
    }

    let mock_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/"))
        .respond_with(ResponseTemplate::new(404))
        .expect(1..)
        .mount(&mock_server)
        .await;

    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: source
      type: http_client
      config:
        url: "{}"
        interval: "100ms"
  transforms:
    - id: pass_through
      inputs: [source]
      outputs: [console]
  sinks:
    - id: console
      type: console
"#,
        mock_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
    let _ = shutdown_tx.send(());

    let result = engine_handle.await.expect("Engine task panicked");
    assert!(result.is_ok());
}