pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for system sink defaults.

mod common;

use std::time::Duration;

use pipeflow::common::types::{Audit, Event};
use pipeflow::config::Config;
use pipeflow::engine::Engine;

#[tokio::test]
async fn test_system_event_sink_writes_default_file() {
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let output_path = temp_dir.path().join("system_event.jsonl");

    let config = Config::from_yaml(&format!(
        r#"
system:
  sinks:
    dir: "{}"
pipeline: {{}}
"#,
        temp_dir.path().display()
    ))
    .expect("Valid config");
    let mut engine = Engine::from_config(config).expect("Engine creation");
    engine.build().await.expect("Engine build");

    let channels = engine
        .system_channels()
        .expect("System channels should exist")
        .clone();

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
    let engine_handle = tokio::spawn(async move {
        let _ = engine
            .run_with_signal(async move {
                let _ = shutdown_rx.await;
            })
            .await;
    });

    let event = Event::new("test_event", serde_json::json!({"data": "hello"}));
    channels.event.send(event).await.expect("Send event");

    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(Duration::from_secs(5), engine_handle).await;

    assert!(!file_content.is_empty(), "Event file should not be empty");
}

#[tokio::test]
async fn test_system_audit_sink_writes_default_file() {
    let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
    let output_path = temp_dir.path().join("system_audit.jsonl");

    let config = Config::from_yaml(&format!(
        r#"
system:
  sinks:
    dir: "{}"
pipeline: {{}}
"#,
        temp_dir.path().display()
    ))
    .expect("Valid config");
    let mut engine = Engine::from_config(config).expect("Engine creation");
    engine.build().await.expect("Engine build");

    let channels = engine
        .system_channels()
        .expect("System channels should exist")
        .clone();

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
    let engine_handle = tokio::spawn(async move {
        let _ = engine
            .run_with_signal(async move {
                let _ = shutdown_rx.await;
            })
            .await;
    });

    let audit = Audit::new("test_sink", "sink", "write")
        .with_message_id("msg-test")
        .success(50);
    channels.audit.send(audit).await.expect("Send audit");

    let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(Duration::from_secs(5), engine_handle).await;

    assert!(!file_content.is_empty(), "Audit file should not be empty");
}