pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for HTTP server source.
//!
//! Tests:
//! - Basic payload emission from POST requests
//! - Bearer token authentication

#![cfg(all(feature = "http-server", feature = "file"))]

mod common;

use common::{
    TestHarness, allocate_local_port, default_test_timeout, post_json_with_retry,
    wait_for_file_content,
};
use serde_json::json;

/// Test HTTP server source emits payloads from POST requests.
#[tokio::test]
async fn test_http_server_source_emits_payload() {
    if common::skip_if_no_network() {
        return;
    }

    let harness = TestHarness::new().await;
    let output_path = harness.output_path("http_server.jsonl");
    let port = allocate_local_port();
    let bind = format!("127.0.0.1:{}", port);
    let url = format!("http://{}/ingest", bind);

    let config = format!(
        r#"
pipeline:
  sources:
    - id: webhook
      type: http_server
      config:
        bind: "{bind}"
        path: "/ingest"
  transforms:
    - id: pass_through
      inputs: [webhook]
      outputs: [out]
  sinks:
    - id: out
      type: file
      config:
        path: "{output_path}"
"#,
        bind = bind,
        output_path = output_path.display()
    );

    let (shutdown_tx, handle) = harness.run_pipeline(&config).await;

    let payload = json!({"hello": "world"});
    let response = post_json_with_retry(&url, &payload, None).await;
    assert!(response.status().is_success());

    let content = wait_for_file_content(&output_path, default_test_timeout()).await;
    let line = content.lines().next().expect("Expected a line in output");
    let message: pipeflow::Message = serde_json::from_str(line).expect("Invalid JSON output");

    assert_eq!(message.meta.source_node, "webhook");
    assert_eq!(message.payload, payload);

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(default_test_timeout(), handle).await;
}

/// Test HTTP server source with bearer token authentication.
#[tokio::test]
async fn test_http_server_source_bearer_auth() {
    if common::skip_if_no_network() {
        return;
    }

    let harness = TestHarness::new().await;
    let output_path = harness.output_path("http_server_auth.jsonl");
    let port = allocate_local_port();
    let bind = format!("127.0.0.1:{}", port);
    let url = format!("http://{}/secure", bind);

    let config = format!(
        r#"
pipeline:
  sources:
    - id: webhook
      type: http_server
      config:
        bind: "{bind}"
        path: "/secure"
        auth:
          type: bearer
          token: "secret-token"
  transforms:
    - id: pass_through
      inputs: [webhook]
      outputs: [out]
  sinks:
    - id: out
      type: file
      config:
        path: "{output_path}"
"#,
        bind = bind,
        output_path = output_path.display()
    );

    let (shutdown_tx, handle) = harness.run_pipeline(&config).await;

    // Request without auth should fail
    let payload = json!({"hello": "secure"});
    let response = post_json_with_retry(&url, &payload, None).await;
    assert_eq!(response.status(), reqwest::StatusCode::UNAUTHORIZED);

    // Request with correct auth should succeed
    let response = post_json_with_retry(&url, &payload, Some("secret-token")).await;
    assert!(response.status().is_success());

    let content = wait_for_file_content(&output_path, default_test_timeout()).await;
    let line = content.lines().next().expect("Expected a line in output");
    let message: pipeflow::Message = serde_json::from_str(line).expect("Invalid JSON output");

    assert_eq!(message.meta.source_node, "webhook");
    assert_eq!(message.payload, payload);

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(default_test_timeout(), handle).await;
}