mod common;
use std::time::Duration;
use pipeflow::common::types::{Audit, Event};
use pipeflow::config::Config;
use pipeflow::engine::Engine;
#[tokio::test]
async fn test_system_event_sink_writes_default_file() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let output_path = temp_dir.path().join("system_event.jsonl");
let config = Config::from_yaml(&format!(
r#"
system:
sinks:
dir: "{}"
pipeline: {{}}
"#,
temp_dir.path().display()
))
.expect("Valid config");
let mut engine = Engine::from_config(config).expect("Engine creation");
engine.build().await.expect("Engine build");
let channels = engine
.system_channels()
.expect("System channels should exist")
.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let engine_handle = tokio::spawn(async move {
let _ = engine
.run_with_signal(async move {
let _ = shutdown_rx.await;
})
.await;
});
let event = Event::new("test_event", serde_json::json!({"data": "hello"}));
channels.event.send(event).await.expect("Send event");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(Duration::from_secs(5), engine_handle).await;
assert!(!file_content.is_empty(), "Event file should not be empty");
}
#[tokio::test]
async fn test_system_audit_sink_writes_default_file() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let output_path = temp_dir.path().join("system_audit.jsonl");
let config = Config::from_yaml(&format!(
r#"
system:
sinks:
dir: "{}"
pipeline: {{}}
"#,
temp_dir.path().display()
))
.expect("Valid config");
let mut engine = Engine::from_config(config).expect("Engine creation");
engine.build().await.expect("Engine build");
let channels = engine
.system_channels()
.expect("System channels should exist")
.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let engine_handle = tokio::spawn(async move {
let _ = engine
.run_with_signal(async move {
let _ = shutdown_rx.await;
})
.await;
});
let audit = Audit::new("test_sink", "sink", "write")
.with_message_id("msg-test")
.success(50);
channels.audit.send(audit).await.expect("Send audit");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(Duration::from_secs(5), engine_handle).await;
assert!(!file_content.is_empty(), "Audit file should not be empty");
}