#![cfg(feature = "http-client")]
mod common;
use common::run_engine;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(feature = "file")]
async fn test_http_source_500_error() {
if common::skip_if_no_network() {
return;
}
let mock_server = MockServer::start().await;
let temp_dir = tempfile::tempdir().unwrap();
let output_path = temp_dir.path().join("output.jsonl");
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(500))
.expect(1..)
.mount(&mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [sink]
sinks:
- id: sink
type: file
config:
path: "{}"
"#,
mock_server.uri(),
output_path.display()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(
result.is_ok(),
"Engine should gracefully handle HTTP errors"
);
if output_path.exists() {
let content = std::fs::read_to_string(&output_path).unwrap();
assert!(content.is_empty(), "File should be empty on source error");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_source_404_error() {
if common::skip_if_no_network() {
return;
}
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(404))
.expect(1..)
.mount(&mock_server)
.await;
let yaml = format!(
r#"
pipeline:
sources:
- id: source
type: http_client
config:
url: "{}"
interval: "100ms"
transforms:
- id: pass_through
inputs: [source]
outputs: [console]
sinks:
- id: console
type: console
"#,
mock_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok());
}