pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for system sources (DLQ, Event, Audit)

#![cfg(feature = "file")]

mod common;

use std::fs;
use std::time::Duration;

use pipeflow::Message;
use pipeflow::common::types::{Audit, Event};

#[tokio::test]
async fn test_dlq_source_to_file() {
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let output_path = temp_dir.path().join("dlq_output.jsonl");

    let config_yaml = format!(
        r#"
pipeline:
  transforms:
    - id: dlq_to_file
      inputs: [source::system::dlq]
      outputs: [dlq_file]

  sinks:
    - id: dlq_file
      type: file
      config:
        path: "{}"
"#,
        output_path.display()
    );

    let harness = common::SystemSourceHarness::new(&config_yaml).await;

    // Send a message directly to DLQ (original message that failed)
    let failed_msg = Message::new("test_source", serde_json::json!({"data": "test_failed"}));

    harness
        .channels
        .dlq
        .send(failed_msg)
        .await
        .expect("Failed to send to DLQ");

    // Wait for file content
    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    // Shutdown pipeline
    harness.shutdown(Duration::from_secs(5)).await;

    // Verify output file contains the message
    assert!(!file_content.is_empty(), "Output file should not be empty");

    // Parse the JSONL line
    let line = file_content
        .lines()
        .next()
        .expect("Should have at least one line");
    let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSON");

    // Verify payload contains the original message
    // DLQ source wraps the received Message, so structure is:
    // { meta: ..., payload: { meta: ..., payload: { data: "test_failed" } } }
    let inner_msg = &parsed["payload"];
    assert!(
        inner_msg["meta"].is_object(),
        "Should contain original message meta"
    );
    assert_eq!(
        inner_msg["payload"]["data"], "test_failed",
        "Should contain original message payload"
    );
}

#[tokio::test]
async fn test_event_source_processing() {
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let output_path = temp_dir.path().join("event_output.jsonl");

    let config_yaml = format!(
        r#"
pipeline:
  transforms:
    - id: event_to_file
      inputs: [source::system::event]
      outputs: [event_file]

  sinks:
    - id: event_file
      type: file
      config:
        path: "{}"
"#,
        output_path.display()
    );

    let harness = common::SystemSourceHarness::new(&config_yaml).await;

    // Send an event
    let event = Event::new("pipeline_started", serde_json::json!({"version": "1.0.0"}))
        .with_label("env", "test");

    harness
        .channels
        .event
        .send(event)
        .await
        .expect("Failed to send event");

    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    harness.shutdown(Duration::from_secs(5)).await;

    // Verify output
    let line = file_content
        .lines()
        .next()
        .expect("Should have at least one line");
    let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSON");

    let payload = &parsed["payload"];
    assert_eq!(payload["name"], "pipeline_started");
    assert!(payload["labels"]["env"].as_str().is_some());
}

#[tokio::test]
async fn test_audit_source_processing() {
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let output_path = temp_dir.path().join("audit_output.jsonl");

    let config_yaml = format!(
        r#"
pipeline:
  transforms:
    - id: audit_to_file
      inputs: [source::system::audit]
      outputs: [audit_file]

  sinks:
    - id: audit_file
      type: file
      config:
        path: "{}"
"#,
        output_path.display()
    );

    let harness = common::SystemSourceHarness::new(&config_yaml).await;

    // Send an audit record
    let audit = Audit::new("test_sink", "sink", "write")
        .with_message_id("msg-123")
        .with_label("batch", "1")
        .success(150);

    harness
        .channels
        .audit
        .send(audit)
        .await
        .expect("Failed to send audit");

    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    harness.shutdown(Duration::from_secs(5)).await;

    // Verify output
    let line = file_content
        .lines()
        .next()
        .expect("Should have at least one line");
    let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSON");

    let payload = &parsed["payload"];
    assert_eq!(payload["node_id"], "test_sink");
    assert_eq!(payload["node_type"], "sink");
    assert_eq!(payload["operation"], "write");
    assert_eq!(payload["status"], "success");
    assert_eq!(payload["duration_ms"], 150);
    assert_eq!(payload["message_id"], "msg-123");
}

#[tokio::test]
async fn test_all_system_sources_together() {
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let dlq_path = temp_dir.path().join("dlq.jsonl");
    let event_path = temp_dir.path().join("events.jsonl");
    let audit_path = temp_dir.path().join("audits.jsonl");

    let config_yaml = format!(
        r#"
pipeline:
  transforms:
    - id: dlq_to_file
      inputs: [source::system::dlq]
      outputs: [dlq_file]
    - id: event_to_file
      inputs: [source::system::event]
      outputs: [event_file]
    - id: audit_to_file
      inputs: [source::system::audit]
      outputs: [audit_file]

  sinks:
    - id: dlq_file
      type: file
      config:
        path: "{}"
    - id: event_file
      type: file
      config:
        path: "{}"
    - id: audit_file
      type: file
      config:
        path: "{}"
"#,
        dlq_path.display(),
        event_path.display(),
        audit_path.display()
    );

    let harness = common::SystemSourceHarness::new(&config_yaml).await;

    // Send one message to each channel
    let failed_msg = Message::new("src", serde_json::json!({"failed": true}));
    let event = Event::new("test", serde_json::json!({}));
    let audit = Audit::new("test_node", "transform", "process").success(10);

    harness.channels.dlq.send(failed_msg).await.unwrap();
    harness.channels.event.send(event).await.unwrap();
    harness.channels.audit.send(audit).await.unwrap();

    // Wait for all files to be populated
    common::wait_for_file_content(&dlq_path, Duration::from_secs(5)).await;
    common::wait_for_file_content(&event_path, Duration::from_secs(5)).await;
    common::wait_for_file_content(&audit_path, Duration::from_secs(5)).await;

    harness.shutdown(Duration::from_secs(5)).await;

    // Verify all output files exist and have content
    for (name, path) in [
        ("DLQ", &dlq_path),
        ("Event", &event_path),
        ("Audit", &audit_path),
    ] {
        assert!(path.exists(), "{} output file should exist", name);
        let content = fs::read_to_string(path).unwrap_or_else(|_| panic!("Read {} file", name));
        assert!(!content.is_empty(), "{} file should not be empty", name);
    }
}