#![allow(dead_code)]
use pipeflow::config::Config;
use pipeflow::engine::Engine;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
pub struct TestHarness {
pub mock_server: wiremock::MockServer,
pub temp_dir: tempfile::TempDir,
}
impl TestHarness {
pub async fn new() -> Self {
let mock_server = wiremock::MockServer::start().await;
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
Self {
mock_server,
temp_dir,
}
}
pub fn output_path(&self, filename: &str) -> PathBuf {
self.temp_dir.path().join(filename)
}
pub async fn run_pipeline(
&self,
yaml: &str,
) -> (
tokio::sync::oneshot::Sender<()>,
JoinHandle<anyhow::Result<()>>,
) {
let config = Config::from_yaml(yaml).expect("Failed to parse YAML");
let mut engine = Engine::from_config(config).expect("Failed to create Engine");
engine.build().await.expect("Failed to build Engine");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
engine
.run_with_signal(async {
let _ = shutdown_rx.await;
})
.await
.map_err(anyhow::Error::from)
});
(shutdown_tx, handle)
}
}
pub async fn run_engine(
yaml: &str,
) -> (
tokio::sync::oneshot::Sender<()>,
JoinHandle<anyhow::Result<()>>,
) {
let config = Config::from_yaml(yaml).expect("Failed to parse YAML");
let mut engine = Engine::from_config(config).expect("Failed to create Engine");
engine.build().await.expect("Failed to build Engine");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
engine
.run_with_signal(async {
let _ = shutdown_rx.await;
})
.await
.map_err(anyhow::Error::from)
});
(shutdown_tx, handle)
}
pub fn default_test_timeout() -> Duration {
std::env::var("PIPEFLOW_TEST_TIMEOUT")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or_else(|| Duration::from_secs(10))
}
pub fn network_tests_enabled() -> bool {
let skip = std::env::var("PIPEFLOW_SKIP_NETWORK_TESTS")
.ok()
.map(|val| matches!(val.as_str(), "1" | "true" | "yes"))
.unwrap_or(false);
if skip {
return false;
}
std::net::TcpListener::bind("127.0.0.1:0").is_ok()
}
pub fn skip_if_no_network() -> bool {
if network_tests_enabled() {
false
} else {
eprintln!("Skipping network-dependent test: localhost bind unavailable.");
true
}
}
pub async fn wait_for_file_content(path: impl AsRef<Path>, timeout: Duration) -> String {
let start = std::time::Instant::now();
let path = path.as_ref();
while start.elapsed() < timeout {
match std::fs::read_to_string(path) {
Ok(content) if !content.trim().is_empty() => return content,
_ => {}
}
sleep(Duration::from_millis(100)).await;
}
panic!("Timeout waiting for file content at: {}", path.display());
}
pub async fn wait_for_file_condition(
path: impl AsRef<Path>,
timeout: Duration,
predicate: impl Fn(&str) -> bool,
) -> String {
let start = std::time::Instant::now();
let path = path.as_ref();
while start.elapsed() < timeout {
match std::fs::read_to_string(path) {
Ok(content) if predicate(&content) => return content,
_ => {}
}
sleep(Duration::from_millis(100)).await;
}
panic!("Timeout waiting for condition at: {}", path.display());
}
pub async fn wait_for_lines(
path: impl AsRef<Path>,
min_lines: usize,
timeout: Duration,
) -> Vec<String> {
let start = std::time::Instant::now();
let path = path.as_ref();
while start.elapsed() < timeout {
if let Ok(content) = std::fs::read_to_string(path) {
let lines: Vec<String> = content.lines().map(|s| s.to_string()).collect();
if lines.len() >= min_lines {
return lines;
}
}
sleep(Duration::from_millis(100)).await;
}
panic!(
"Timeout waiting for {} lines at: {}",
min_lines,
path.display()
);
}
pub fn allocate_local_port() -> u16 {
std::net::TcpListener::bind("127.0.0.1:0")
.expect("Failed to bind to random port")
.local_addr()
.expect("Failed to read local addr")
.port()
}
pub async fn post_json_with_retry(
url: &str,
payload: &serde_json::Value,
bearer: Option<&str>,
) -> reqwest::Response {
let client = reqwest::Client::new();
let mut last_err = None;
for _ in 0..20 {
let mut request = client.post(url).json(payload);
if let Some(token) = bearer {
request = request.bearer_auth(token);
}
match request.send().await {
Ok(response) => return response,
Err(err) => {
last_err = Some(err);
sleep(Duration::from_millis(50)).await;
}
}
}
panic!("Failed to POST after retries: {:?}", last_err);
}
pub async fn wait_for_requests(server: &wiremock::MockServer, count: usize, timeout: Duration) {
let start = std::time::Instant::now();
while start.elapsed() < timeout {
let received = server
.received_requests()
.await
.map(|r| r.len())
.unwrap_or(0);
if received >= count {
return;
}
sleep(Duration::from_millis(50)).await;
}
let received = server
.received_requests()
.await
.map(|r| r.len())
.unwrap_or(0);
panic!(
"Timeout waiting for {} requests. Received: {}",
count, received
);
}
pub struct SystemSourceHarness {
pub temp_dir: tempfile::TempDir,
pub channels: pipeflow::engine::SystemChannels,
shutdown_tx: tokio::sync::oneshot::Sender<()>,
engine_handle: tokio::task::JoinHandle<()>,
}
impl SystemSourceHarness {
pub async fn new(yaml: &str) -> Self {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let config = Config::from_yaml(yaml).expect("Failed to parse YAML");
let mut engine = Engine::from_config(config).expect("Failed to create Engine");
engine.build().await.expect("Failed to build Engine");
let channels = engine
.system_channels()
.expect("System channels should exist")
.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let engine_handle = tokio::spawn(async move {
let _ = engine
.run_with_signal(async move {
let _ = shutdown_rx.await;
})
.await;
});
Self {
temp_dir,
channels,
shutdown_tx,
engine_handle,
}
}
pub fn output_path(&self, filename: &str) -> PathBuf {
self.temp_dir.path().join(filename)
}
pub async fn shutdown(self, timeout: Duration) {
let _ = self.shutdown_tx.send(());
let _ = tokio::time::timeout(timeout, self.engine_handle).await;
}
}