pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E test for DLQ on sink failure.

mod common;

use common::{default_test_timeout, run_engine, wait_for_file_content};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(all(feature = "file", feature = "http-client"))]
async fn test_sink_failure_routes_to_dlq() {
    if common::skip_if_no_network() {
        return;
    }

    let source_server = MockServer::start().await;
    let sink_server = MockServer::start().await;
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let dlq_path = temp_dir.path().join("dlq.jsonl");

    Mock::given(method("GET"))
        .and(path("/data"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "status": "bad_sink",
            "value": 7
        })))
        .expect(1..)
        .mount(&source_server)
        .await;

    Mock::given(method("POST"))
        .and(path("/webhook"))
        .respond_with(ResponseTemplate::new(500).set_body_string("Sink error"))
        .expect(1..)
        .mount(&sink_server)
        .await;

    let yaml = format!(
        r#"
system:
  sinks:
    dlq: "{dlq_path}"
pipeline:
  sources:
    - id: src
      type: http_client
      config:
        url: "{source_url}/data"
        interval: "100ms"
  transforms:
    - id: pass
      inputs: [src]
      outputs: [sink]
  sinks:
    - id: sink
      type: http_client
      config:
        url: "{sink_url}/webhook"
        method: POST
"#,
        dlq_path = dlq_path.display(),
        source_url = source_server.uri(),
        sink_url = sink_server.uri()
    );

    let (shutdown_tx, engine_handle) = run_engine(&yaml).await;

    let content = wait_for_file_content(&dlq_path, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let result = engine_handle.await.expect("Engine task panicked");
    assert!(result.is_ok(), "Engine should run despite sink failures");

    let line = content.lines().next().expect("Expected a DLQ line");
    let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid DLQ JSON");
    // DLQ wraps the original failed message as its payload,
    // so access the nested payload: parsed["payload"]["payload"]
    let original_payload = &parsed["payload"]["payload"];
    assert_eq!(original_payload["status"], "bad_sink");
    assert_eq!(original_payload["value"], 7);
}