#![cfg(all(feature = "file", feature = "http-client"))]
mod common;
use wiremock::matchers::{method, path};
use wiremock::{Mock, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_basic_file_writing() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"message": "hello",
"count": 42
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [file_sink]
sinks:
- id: file_sink
type: file
config:
path: "{}"
append: true
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, common::default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "File should contain at least one line");
let parsed: serde_json::Value =
serde_json::from_str(lines[0]).expect("Line should be valid JSON");
assert!(
parsed.get("payload").is_some(),
"Message should have payload"
);
assert_eq!(parsed["payload"]["message"], "hello");
assert_eq!(parsed["payload"]["count"], 42);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multiple_messages_accumulate() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "ok"
})))
.expect(3..) .mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: poller
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [poller]
outputs: [file_out]
sinks:
- id: file_out
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let lines = common::wait_for_lines(&output_path, 3, common::default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run without error");
assert!(
lines.len() >= 3,
"File should contain at least 3 lines, got {}",
lines.len()
);
for line in &lines {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|_| panic!("Line should be valid JSON: {}", line));
assert_eq!(parsed["payload"]["status"], "ok");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_overwrite_mode() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
std::fs::write(&output_path, "existing content\n").unwrap();
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"new": true
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [file_out]
sinks:
- id: file_out
type: file
config:
path: "{}"
append: false
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content =
common::wait_for_file_condition(&output_path, common::default_test_timeout(), |content| {
content.contains(r#""new":true"#) && !content.contains("existing content")
})
.await;
assert!(
content.contains(r#""new":true"#) && !content.contains("existing content"),
"File should contain new content and NO old content"
);
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_file_sink_tsv_format() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.tsv");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"name": "Alice",
"score": 95
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [file_sink]
sinks:
- id: file_sink
type: file
config:
path: "{}"
format: tsv
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, common::default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "File should contain at least one line");
let first_line = lines[0];
assert!(
first_line.contains('\t'),
"TSV output should contain tab separators, got: {}",
first_line
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_file_sink_csv_format() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.csv");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"city": "New York",
"temperature": 72
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [file_sink]
sinks:
- id: file_sink
type: file
config:
path: "{}"
format: csv
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, common::default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "File should contain at least one line");
let first_line = lines[0];
assert!(
first_line.contains(','),
"CSV output should contain comma separators, got: {}",
first_line
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_file_sink_csv_with_header() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output_with_header.csv");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"name": "Bob",
"age": 30
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [file_sink]
sinks:
- id: file_sink
type: file
config:
path: "{}"
format: csv
include_header: true
append: false
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let lines = common::wait_for_lines(&output_path, 2, common::default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
assert!(
lines.len() >= 2,
"File should contain at least 2 lines (header + data), got {}",
lines.len()
);
let header = &lines[0];
assert!(
header.contains("name") && header.contains("age"),
"Header should contain column names 'name' and 'age', got: {}",
header
);
let data = &lines[1];
assert!(
data.contains("Bob") && data.contains("30"),
"Data row should contain 'Bob' and '30', got: {}",
data
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_file_sink_tsv_with_header() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output_with_header.tsv");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"product": "Widget",
"price": 99
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [file_sink]
sinks:
- id: file_sink
type: file
config:
path: "{}"
format: tsv
include_header: true
append: false
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let lines = common::wait_for_lines(&output_path, 2, common::default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
assert!(
lines.len() >= 2,
"File should contain at least 2 lines (header + data), got {}",
lines.len()
);
let header = &lines[0];
assert!(
header.contains("product") && header.contains("price"),
"Header should contain column names 'product' and 'price', got: {}",
header
);
assert!(
header.contains('\t'),
"Header should use tab separators, got: {}",
header
);
let data = &lines[1];
assert!(
data.contains("Widget") && data.contains("99"),
"Data row should contain 'Widget' and '99', got: {}",
data
);
}