#![cfg(feature = "http-client")]
mod common;
use common::{default_test_timeout, run_engine, wait_for_requests};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_api_monitoring_pipeline() {
if common::skip_if_no_network() {
return;
}
let mock_server = MockServer::start().await;
let api_response = serde_json::json!({
"status": "healthy",
"metrics": {
"cpu_usage": 45.2,
"memory_mb": 1024,
"requests_per_sec": 150
},
"timestamp": "2026-01-07T12:00:00Z"
});
Mock::given(method("GET"))
.and(path("/api/health"))
.respond_with(ResponseTemplate::new(200).set_body_json(&api_response))
.expect(1..) .mount(&mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: health_monitor
type: http_client
config:
url: "{}/api/health"
interval: "100ms"
transforms:
- id: pass_through
inputs: [health_monitor]
outputs: [console_out]
sinks:
- id: console_out
type: console
"#,
mock_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&mock_server, 1, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should shutdown gracefully");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multi_cycle_polling_stability() {
if common::skip_if_no_network() {
return;
}
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"poll": "ok"})))
.expect(3..) .mount(&mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: poller
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [poller]
outputs: [console_out]
sinks:
- id: console_out
type: console
"#,
mock_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
wait_for_requests(&mock_server, 3, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(
result.is_ok(),
"Engine should remain stable over multiple cycles"
);
}