#![cfg(feature = "file")]
mod common;
use std::fs;
use std::time::Duration;
use pipeflow::common::types::Event;
use pipeflow::config::Config;
use pipeflow::engine::Engine;
use tempfile::TempDir;
use tokio::time::timeout;
fn create_temp_dir() -> TempDir {
tempfile::tempdir().expect("Failed to create temp directory")
}
#[tokio::test]
#[cfg(feature = "file")]
async fn test_config_from_directory_merge_and_run() {
let temp_dir = create_temp_dir();
let cfg_dir = temp_dir.path().join("cfg");
fs::create_dir_all(&cfg_dir).expect("Create cfg dir");
let output_path = temp_dir.path().join("out.jsonl");
fs::write(
cfg_dir.join("00_system.yaml"),
r#"
system:
output_buffer_size: 128
"#,
)
.expect("Write system config");
fs::write(
cfg_dir.join("01_sources.yaml"),
r#"
pipeline:
transforms: []
"#,
)
.expect("Write sources config");
fs::write(
cfg_dir.join("02_pipeline.yaml"),
format!(
r#"
pipeline:
transforms:
- id: t1
inputs: [source::system::event]
outputs: [sink1]
steps:
- type: remap
config:
mappings:
- value: "dir_merge"
to: "$.mode"
sinks:
- id: sink1
type: file
config:
path: "{}"
"#,
output_path.display()
),
)
.expect("Write pipeline config");
let config = Config::from_file(&cfg_dir).expect("Config should load from directory");
assert_eq!(config.system.output_buffer_size(), 128);
assert_eq!(config.pipeline.transforms.len(), 1);
assert_eq!(config.pipeline.sinks.len(), 1);
let mut engine = Engine::from_config(config).expect("Engine creation");
engine.build().await.expect("Engine build");
let channels = engine
.system_channels()
.expect("System channels should exist")
.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let engine_handle = tokio::spawn(async move {
let _ = engine
.run_with_signal(async move {
let _ = shutdown_rx.await;
})
.await;
});
let event = Event::new("dir_merge", serde_json::json!({"hello": "world"}));
channels.event.send(event).await.expect("Send event");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let _ = timeout(Duration::from_secs(5), engine_handle).await;
let line = file_content
.lines()
.next()
.expect("Output should have at least one line");
let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSONL line");
assert_eq!(parsed["payload"]["mode"], "dir_merge");
}
#[tokio::test]
#[cfg(feature = "file")]
async fn test_pipeline_config_loads_and_runs_end_to_end() {
let temp_dir = create_temp_dir();
let output_path = temp_dir.path().join("out.jsonl");
let config_path = temp_dir.path().join("pipeline.yaml");
fs::write(
&config_path,
format!(
r#"
pipeline:
transforms:
- id: t1
inputs: [source::system::event]
outputs: [out]
steps:
- type: remap
config:
mappings:
- value: "outputs_ok"
to: "$.ok"
sinks:
- id: out
type: file
config:
path: "{}"
"#,
output_path.display()
),
)
.expect("Write config file");
let config = Config::from_file(&config_path).expect("Config should load from file");
assert_eq!(config.pipeline.transforms.len(), 1);
assert_eq!(config.pipeline.sinks.len(), 1);
let mut engine = Engine::from_config(config).expect("Engine creation");
engine.build().await.expect("Engine build");
let channels = engine
.system_channels()
.expect("System channels should exist")
.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let engine_handle = tokio::spawn(async move {
let _ = engine
.run_with_signal(async move {
let _ = shutdown_rx.await;
})
.await;
});
let event = Event::new("outputs", serde_json::json!({}));
channels.event.send(event).await.expect("Send event");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let _ = timeout(Duration::from_secs(5), engine_handle).await;
let line = file_content
.lines()
.next()
.expect("Output should have at least one line");
let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSONL line");
assert_eq!(parsed["payload"]["ok"], "outputs_ok");
}