#![cfg(feature = "file")]
mod common;
use std::fs;
use std::time::Duration;
use pipeflow::Message;
use pipeflow::common::types::{Audit, Event};
#[tokio::test]
async fn test_dlq_source_to_file() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let output_path = temp_dir.path().join("dlq_output.jsonl");
let config_yaml = format!(
r#"
pipeline:
transforms:
- id: dlq_to_file
inputs: [source::system::dlq]
outputs: [dlq_file]
sinks:
- id: dlq_file
type: file
config:
path: "{}"
"#,
output_path.display()
);
let harness = common::SystemSourceHarness::new(&config_yaml).await;
let failed_msg = Message::new("test_source", serde_json::json!({"data": "test_failed"}));
harness
.channels
.dlq
.send(failed_msg)
.await
.expect("Failed to send to DLQ");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
harness.shutdown(Duration::from_secs(5)).await;
assert!(!file_content.is_empty(), "Output file should not be empty");
let line = file_content
.lines()
.next()
.expect("Should have at least one line");
let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSON");
let inner_msg = &parsed["payload"];
assert!(
inner_msg["meta"].is_object(),
"Should contain original message meta"
);
assert_eq!(
inner_msg["payload"]["data"], "test_failed",
"Should contain original message payload"
);
}
#[tokio::test]
async fn test_event_source_processing() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let output_path = temp_dir.path().join("event_output.jsonl");
let config_yaml = format!(
r#"
pipeline:
transforms:
- id: event_to_file
inputs: [source::system::event]
outputs: [event_file]
sinks:
- id: event_file
type: file
config:
path: "{}"
"#,
output_path.display()
);
let harness = common::SystemSourceHarness::new(&config_yaml).await;
let event = Event::new("pipeline_started", serde_json::json!({"version": "1.0.0"}))
.with_label("env", "test");
harness
.channels
.event
.send(event)
.await
.expect("Failed to send event");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
harness.shutdown(Duration::from_secs(5)).await;
let line = file_content
.lines()
.next()
.expect("Should have at least one line");
let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSON");
let payload = &parsed["payload"];
assert_eq!(payload["name"], "pipeline_started");
assert!(payload["labels"]["env"].as_str().is_some());
}
#[tokio::test]
async fn test_audit_source_processing() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let output_path = temp_dir.path().join("audit_output.jsonl");
let config_yaml = format!(
r#"
pipeline:
transforms:
- id: audit_to_file
inputs: [source::system::audit]
outputs: [audit_file]
sinks:
- id: audit_file
type: file
config:
path: "{}"
"#,
output_path.display()
);
let harness = common::SystemSourceHarness::new(&config_yaml).await;
let audit = Audit::new("test_sink", "sink", "write")
.with_message_id("msg-123")
.with_label("batch", "1")
.success(150);
harness
.channels
.audit
.send(audit)
.await
.expect("Failed to send audit");
let file_content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
harness.shutdown(Duration::from_secs(5)).await;
let line = file_content
.lines()
.next()
.expect("Should have at least one line");
let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid JSON");
let payload = &parsed["payload"];
assert_eq!(payload["node_id"], "test_sink");
assert_eq!(payload["node_type"], "sink");
assert_eq!(payload["operation"], "write");
assert_eq!(payload["status"], "success");
assert_eq!(payload["duration_ms"], 150);
assert_eq!(payload["message_id"], "msg-123");
}
#[tokio::test]
async fn test_all_system_sources_together() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let dlq_path = temp_dir.path().join("dlq.jsonl");
let event_path = temp_dir.path().join("events.jsonl");
let audit_path = temp_dir.path().join("audits.jsonl");
let config_yaml = format!(
r#"
pipeline:
transforms:
- id: dlq_to_file
inputs: [source::system::dlq]
outputs: [dlq_file]
- id: event_to_file
inputs: [source::system::event]
outputs: [event_file]
- id: audit_to_file
inputs: [source::system::audit]
outputs: [audit_file]
sinks:
- id: dlq_file
type: file
config:
path: "{}"
- id: event_file
type: file
config:
path: "{}"
- id: audit_file
type: file
config:
path: "{}"
"#,
dlq_path.display(),
event_path.display(),
audit_path.display()
);
let harness = common::SystemSourceHarness::new(&config_yaml).await;
let failed_msg = Message::new("src", serde_json::json!({"failed": true}));
let event = Event::new("test", serde_json::json!({}));
let audit = Audit::new("test_node", "transform", "process").success(10);
harness.channels.dlq.send(failed_msg).await.unwrap();
harness.channels.event.send(event).await.unwrap();
harness.channels.audit.send(audit).await.unwrap();
common::wait_for_file_content(&dlq_path, Duration::from_secs(5)).await;
common::wait_for_file_content(&event_path, Duration::from_secs(5)).await;
common::wait_for_file_content(&audit_path, Duration::from_secs(5)).await;
harness.shutdown(Duration::from_secs(5)).await;
for (name, path) in [
("DLQ", &dlq_path),
("Event", &event_path),
("Audit", &audit_path),
] {
assert!(path.exists(), "{} output file should exist", name);
let content = fs::read_to_string(path).unwrap_or_else(|_| panic!("Read {} file", name));
assert!(!content.is_empty(), "{} file should not be empty", name);
}
}