#![cfg(all(feature = "http-client", feature = "file"))]
use std::time::Duration;
use wiremock::matchers::{method, path};
use wiremock::{Mock, ResponseTemplate};
mod common;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_filter_step_passes_matching() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "success",
"value": 42
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: filter
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
field: "$.status"
operator: eq
value: "success"
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "File should contain at least one line");
let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("Invalid JSON");
assert_eq!(parsed["payload"]["status"], "success");
assert_eq!(parsed["payload"]["value"], 42);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_filter_step_rejects_non_matching() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "error",
"message": "Something went wrong"
})))
.expect(2..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: filter
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
field: "$.status"
operator: eq
value: "success"
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
common::wait_for_requests(&harness.mock_server, 2, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let content = std::fs::read_to_string(&output_path).unwrap_or_default();
assert!(
content.is_empty(),
"File should be empty when all messages are filtered out"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_remap_step_transforms_fields() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 123,
"name": "test item",
"nested": {
"value": 456
}
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: remap
inputs: [source]
outputs: [sink]
steps:
- type: remap
config:
mappings:
- from: "$.id"
to: "$.item_id"
- from: "$.name"
to: "$.item_name"
- from: "$.nested.value"
to: "$.flat_value"
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "File should contain remapped messages");
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["item_id"], 123);
assert_eq!(parsed["payload"]["item_name"], "test item");
assert_eq!(parsed["payload"]["flat_value"], 456);
assert!(parsed["payload"]["id"].is_null());
assert!(parsed["payload"]["name"].is_null());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multi_step_filter_then_remap() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"userId": 1,
"id": 101,
"title": "Hello World"
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: pipeline
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
field: "$.userId"
operator: eq
value: 1
- type: remap
config:
mappings:
- from: "$.id"
to: "$.post_id"
- from: "$.title"
to: "$.post_title"
- from: "$.userId"
to: "$.author_id"
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(
!lines.is_empty(),
"File should contain transformed messages"
);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["post_id"], 101);
assert_eq!(parsed["payload"]["post_title"], "Hello World");
assert_eq!(parsed["payload"]["author_id"], 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_filter_multi_conditions_and() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "active",
"priority": 5
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: filter
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
mode: and
conditions:
- field: "$.status"
operator: eq
value: "active"
- field: "$.priority"
operator: ge
value: 3
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(
!lines.is_empty(),
"Message matching all AND conditions should pass"
);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["status"], "active");
assert_eq!(parsed["payload"]["priority"], 5);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_filter_multi_conditions_or() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "normal",
"priority": 15
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: filter
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
mode: or
conditions:
- field: "$.status"
operator: eq
value: "critical"
- field: "$.priority"
operator: ge
value: 10
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(
!lines.is_empty(),
"Message matching at least one OR condition should pass"
);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["status"], "normal"); assert_eq!(parsed["payload"]["priority"], 15); }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_filter_abs_gt_operator() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"symbol": "BTC",
"price_change": -5.5
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: filter
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
field: "$.price_change"
operator: abs_gt
value: 3
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(
!lines.is_empty(),
"Message with |price_change| > 3 should pass filter"
);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["symbol"], "BTC");
assert_eq!(parsed["payload"]["price_change"], -5.5);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_filter_matches_regex_operator() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"user": "john_doe",
"email": "john@example.com"
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "200ms"
transforms:
- id: filter
inputs: [source]
outputs: [sink]
steps:
- type: filter
config:
field: "$.email"
operator: matches
value: "^[a-z]+@example\\.com$"
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(
!lines.is_empty(),
"Message with email matching regex should pass filter"
);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["user"], "john_doe");
assert_eq!(parsed["payload"]["email"], "john@example.com");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_window_count_trigger_merge() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(1));
let counter_clone = counter.clone();
Mock::given(method("GET"))
.and(path("/"))
.respond_with(move |_: &wiremock::Request| {
let n = counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ResponseTemplate::new(200).set_body_json(serde_json::json!({
"seq": n,
"value": format!("item_{}", n)
}))
})
.expect(3..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: window
inputs: [source]
outputs: [sink]
steps:
- type: window
config:
size: 3
operation: merge
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "Window should emit merged message");
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert!(parsed["payload"]["seq"].is_number());
assert!(parsed["payload"]["value"].is_string());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_window_select_first() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(1));
let counter_clone = counter.clone();
Mock::given(method("GET"))
.and(path("/"))
.respond_with(move |_: &wiremock::Request| {
let n = counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ResponseTemplate::new(200).set_body_json(serde_json::json!({
"seq": n
}))
})
.expect(3..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: window
inputs: [source]
outputs: [sink]
steps:
- type: window
config:
size: 3
operation: select_one
strategy: first
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content = common::wait_for_file_content(&output_path, Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "Window should emit first message");
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["seq"], 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_window_time_trigger() {
if common::skip_if_no_network() {
return;
}
let harness = common::TestHarness::new().await;
let output_path = harness.output_path("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"value": "timer_test"
})))
.expect(1..)
.mount(&harness.mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: window
inputs: [source]
outputs: [sink]
steps:
- type: window
config:
size: 100
duration: "800ms"
operation: merge
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
harness.mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let content =
common::wait_for_file_content(&output_path, std::time::Duration::from_secs(5)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
let lines: Vec<&str> = content.lines().collect();
assert!(!lines.is_empty(), "Window should emit via time trigger");
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["value"], "timer_test");
}