pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#![cfg(feature = "file")]

//! Integration tests for configuration loading behaviors.
//!
//! These tests cover:
//! - Loading from a directory (YAML merge order + normalization + validation)
//! - End-to-end pipeline execution after loading configs

mod common;

use std::fs;
use std::time::Duration;

use pipeflow::common::types::Event;
use pipeflow::config::Config;
use pipeflow::engine::Engine;
use tempfile::TempDir;
use tokio::time::timeout;

/// Helper to create a temp directory for test outputs
fn create_temp_dir() -> TempDir {
    tempfile::tempdir().expect("Failed to create temp directory")
}

#[tokio::test]
#[cfg(feature = "file")]
async fn test_config_from_directory_merge_and_run() {
    let temp_dir = create_temp_dir();
    let cfg_dir = temp_dir.path().join("cfg");
    fs::create_dir_all(&cfg_dir).expect("Create cfg dir");

    let output_path = temp_dir.path().join("out.jsonl");

    // 00_system.yaml
    fs::write(
        cfg_dir.join("00_system.yaml"),
        r#"
system:
  output_buffer_size: 128
"#,
    )
    .expect("Write system config");

    // 01_sources.yaml
    fs::write(
        cfg_dir.join("01_sources.yaml"),
        r#"
pipeline:
  transforms: []
"#,
    )
    .expect("Write sources config");

    // 02_pipeline.yaml
    fs::write(
        cfg_dir.join("02_pipeline.yaml"),
        format!(
            r#"
pipeline:
  transforms:
    - id: t1
      inputs: [source::system::event]
      outputs: [sink1]
      steps:
        - type: remap
          config:
            mappings:
              - value: "dir_merge"
                to: "$.mode"

  sinks:
    - id: sink1
      type: file
      config:
        path: "{}"
"#,
            output_path.display()
        ),
    )
    .expect("Write pipeline config");

    // Directory load runs normalize + validate internally.
    let config = Config::from_file(&cfg_dir).expect("Config should load from directory");
    assert_eq!(config.system.output_buffer_size(), 128);

    assert_eq!(config.pipeline.transforms.len(), 1);
    assert_eq!(config.pipeline.sinks.len(), 1);

    let mut engine = Engine::from_config(config).expect("Engine creation");
    engine.build().await.expect("Engine build");

    let channels = engine
        .system_channels()
        .expect("System channels should exist")
        .clone();

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
    let engine_handle = tokio::spawn(async move {
        let _ = engine
            .run_with_signal(async move {
                let _ = shutdown_rx.await;
            })
            .await;
    });

    let event = Event::new("dir_merge", serde_json::json!({"hello": "world"}));
    channels.event.send(event).await.expect("Send event");

    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    let _ = shutdown_tx.send(());
    let _ = timeout(Duration::from_secs(5), engine_handle).await;

    let line = file_content
        .lines()
        .next()
        .expect("Output should have at least one line");
    let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSONL line");

    assert_eq!(parsed["payload"]["mode"], "dir_merge");
}

#[tokio::test]
#[cfg(feature = "file")]
async fn test_pipeline_config_loads_and_runs_end_to_end() {
    let temp_dir = create_temp_dir();
    let output_path = temp_dir.path().join("out.jsonl");
    let config_path = temp_dir.path().join("pipeline.yaml");

    // Define a pipeline with explicit transform inputs and outputs.
    fs::write(
        &config_path,
        format!(
            r#"
pipeline:
  transforms:
    - id: t1
      inputs: [source::system::event]
      outputs: [out]
      steps:
        - type: remap
          config:
            mappings:
              - value: "outputs_ok"
                to: "$.ok"

  sinks:
    - id: out
      type: file
      config:
        path: "{}"
"#,
            output_path.display()
        ),
    )
    .expect("Write config file");

    let config = Config::from_file(&config_path).expect("Config should load from file");

    assert_eq!(config.pipeline.transforms.len(), 1);
    assert_eq!(config.pipeline.sinks.len(), 1);

    let mut engine = Engine::from_config(config).expect("Engine creation");
    engine.build().await.expect("Engine build");

    let channels = engine
        .system_channels()
        .expect("System channels should exist")
        .clone();

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
    let engine_handle = tokio::spawn(async move {
        let _ = engine
            .run_with_signal(async move {
                let _ = shutdown_rx.await;
            })
            .await;
    });

    let event = Event::new("outputs", serde_json::json!({}));
    channels.event.send(event).await.expect("Send event");

    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    let _ = shutdown_tx.send(());
    let _ = timeout(Duration::from_secs(5), engine_handle).await;

    let line = file_content
        .lines()
        .next()
        .expect("Output should have at least one line");
    let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSONL line");

    assert_eq!(parsed["payload"]["ok"], "outputs_ok");
}