pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#![allow(dead_code)]

use pipeflow::config::Config;
use pipeflow::engine::Engine;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;

pub struct TestHarness {
    pub mock_server: wiremock::MockServer,
    pub temp_dir: tempfile::TempDir,
}

impl TestHarness {
    pub async fn new() -> Self {
        let mock_server = wiremock::MockServer::start().await;
        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
        Self {
            mock_server,
            temp_dir,
        }
    }

    pub fn output_path(&self, filename: &str) -> PathBuf {
        self.temp_dir.path().join(filename)
    }

    pub async fn run_pipeline(
        &self,
        yaml: &str,
    ) -> (
        tokio::sync::oneshot::Sender<()>,
        JoinHandle<anyhow::Result<()>>,
    ) {
        let config = Config::from_yaml(yaml).expect("Failed to parse YAML");
        let mut engine = Engine::from_config(config).expect("Failed to create Engine");
        engine.build().await.expect("Failed to build Engine");

        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
        let handle = tokio::spawn(async move {
            engine
                .run_with_signal(async {
                    let _ = shutdown_rx.await;
                })
                .await
                .map_err(anyhow::Error::from)
        });

        (shutdown_tx, handle)
    }
}

/// Start an engine from YAML and return shutdown channel + task handle.
pub async fn run_engine(
    yaml: &str,
) -> (
    tokio::sync::oneshot::Sender<()>,
    JoinHandle<anyhow::Result<()>>,
) {
    let config = Config::from_yaml(yaml).expect("Failed to parse YAML");
    let mut engine = Engine::from_config(config).expect("Failed to create Engine");
    engine.build().await.expect("Failed to build Engine");

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
    let handle = tokio::spawn(async move {
        engine
            .run_with_signal(async {
                let _ = shutdown_rx.await;
            })
            .await
            .map_err(anyhow::Error::from)
    });

    (shutdown_tx, handle)
}

/// Returns the default test timeout, configurable via `PIPEFLOW_TEST_TIMEOUT` env var.
/// Default is 10 seconds.
pub fn default_test_timeout() -> Duration {
    std::env::var("PIPEFLOW_TEST_TIMEOUT")
        .ok()
        .and_then(|s| s.parse::<u64>().ok())
        .map(Duration::from_secs)
        .unwrap_or_else(|| Duration::from_secs(10))
}

/// Returns true when tests can bind to localhost (required for wiremock servers).
pub fn network_tests_enabled() -> bool {
    let skip = std::env::var("PIPEFLOW_SKIP_NETWORK_TESTS")
        .ok()
        .map(|val| matches!(val.as_str(), "1" | "true" | "yes"))
        .unwrap_or(false);
    if skip {
        return false;
    }

    std::net::TcpListener::bind("127.0.0.1:0").is_ok()
}

/// Returns true if the caller should skip network-dependent tests.
pub fn skip_if_no_network() -> bool {
    if network_tests_enabled() {
        false
    } else {
        eprintln!("Skipping network-dependent test: localhost bind unavailable.");
        true
    }
}

/// Waits for a file to exist and contain at least one line.
/// Returns the file content if successful, panics on timeout.
pub async fn wait_for_file_content(path: impl AsRef<Path>, timeout: Duration) -> String {
    let start = std::time::Instant::now();
    let path = path.as_ref();

    while start.elapsed() < timeout {
        match std::fs::read_to_string(path) {
            Ok(content) if !content.trim().is_empty() => return content,
            _ => {}
        }
        sleep(Duration::from_millis(100)).await;
    }

    panic!("Timeout waiting for file content at: {}", path.display());
}

/// Waits for a file to satisfy the provided predicate.
pub async fn wait_for_file_condition(
    path: impl AsRef<Path>,
    timeout: Duration,
    predicate: impl Fn(&str) -> bool,
) -> String {
    let start = std::time::Instant::now();
    let path = path.as_ref();

    while start.elapsed() < timeout {
        match std::fs::read_to_string(path) {
            Ok(content) if predicate(&content) => return content,
            _ => {}
        }
        sleep(Duration::from_millis(100)).await;
    }

    panic!("Timeout waiting for condition at: {}", path.display());
}

/// Waits for a file to contain at least `min_lines` lines.
pub async fn wait_for_lines(
    path: impl AsRef<Path>,
    min_lines: usize,
    timeout: Duration,
) -> Vec<String> {
    let start = std::time::Instant::now();
    let path = path.as_ref();

    while start.elapsed() < timeout {
        if let Ok(content) = std::fs::read_to_string(path) {
            let lines: Vec<String> = content.lines().map(|s| s.to_string()).collect();
            if lines.len() >= min_lines {
                return lines;
            }
        }
        sleep(Duration::from_millis(100)).await;
    }

    panic!(
        "Timeout waiting for {} lines at: {}",
        min_lines,
        path.display()
    );
}

/// Allocate a local ephemeral port for http_server tests.
pub fn allocate_local_port() -> u16 {
    std::net::TcpListener::bind("127.0.0.1:0")
        .expect("Failed to bind to random port")
        .local_addr()
        .expect("Failed to read local addr")
        .port()
}

/// POST JSON with optional bearer auth, retrying for a short period.
pub async fn post_json_with_retry(
    url: &str,
    payload: &serde_json::Value,
    bearer: Option<&str>,
) -> reqwest::Response {
    let client = reqwest::Client::new();
    let mut last_err = None;
    for _ in 0..20 {
        let mut request = client.post(url).json(payload);
        if let Some(token) = bearer {
            request = request.bearer_auth(token);
        }
        match request.send().await {
            Ok(response) => return response,
            Err(err) => {
                last_err = Some(err);
                sleep(Duration::from_millis(50)).await;
            }
        }
    }
    panic!("Failed to POST after retries: {:?}", last_err);
}

/// Waits for the mock server to receive at least `count` requests.
pub async fn wait_for_requests(server: &wiremock::MockServer, count: usize, timeout: Duration) {
    let start = std::time::Instant::now();
    while start.elapsed() < timeout {
        let received = server
            .received_requests()
            .await
            .map(|r| r.len())
            .unwrap_or(0);
        if received >= count {
            return;
        }
        sleep(Duration::from_millis(50)).await;
    }

    let received = server
        .received_requests()
        .await
        .map(|r| r.len())
        .unwrap_or(0);
    panic!(
        "Timeout waiting for {} requests. Received: {}",
        count, received
    );
}

/// Test harness for system sources (DLQ, Event, Audit).
///
/// Provides a simplified way to test pipelines with system sources by handling
/// engine setup, channel access, and graceful shutdown.
pub struct SystemSourceHarness {
    pub temp_dir: tempfile::TempDir,
    pub channels: pipeflow::engine::SystemChannels,
    shutdown_tx: tokio::sync::oneshot::Sender<()>,
    engine_handle: tokio::task::JoinHandle<()>,
}

impl SystemSourceHarness {
    /// Create a new harness from a YAML config string.
    ///
    /// The config must reference at least one system source.
    pub async fn new(yaml: &str) -> Self {
        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
        let config = Config::from_yaml(yaml).expect("Failed to parse YAML");
        let mut engine = Engine::from_config(config).expect("Failed to create Engine");
        engine.build().await.expect("Failed to build Engine");

        let channels = engine
            .system_channels()
            .expect("System channels should exist")
            .clone();

        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
        let engine_handle = tokio::spawn(async move {
            let _ = engine
                .run_with_signal(async move {
                    let _ = shutdown_rx.await;
                })
                .await;
        });

        Self {
            temp_dir,
            channels,
            shutdown_tx,
            engine_handle,
        }
    }

    /// Get the path to a file in the temp directory.
    pub fn output_path(&self, filename: &str) -> PathBuf {
        self.temp_dir.path().join(filename)
    }

    /// Shutdown the pipeline gracefully with a timeout.
    pub async fn shutdown(self, timeout: Duration) {
        let _ = self.shutdown_tx.send(());
        let _ = tokio::time::timeout(timeout, self.engine_handle).await;
    }
}