#![cfg(all(feature = "http-server", feature = "file"))]
mod common;
use common::{
TestHarness, allocate_local_port, default_test_timeout, post_json_with_retry,
wait_for_file_content,
};
use serde_json::json;
#[tokio::test]
async fn test_http_server_source_emits_payload() {
if common::skip_if_no_network() {
return;
}
let harness = TestHarness::new().await;
let output_path = harness.output_path("http_server.jsonl");
let port = allocate_local_port();
let bind = format!("127.0.0.1:{}", port);
let url = format!("http://{}/ingest", bind);
let config = format!(
r#"
pipeline:
sources:
- id: webhook
type: http_server
config:
bind: "{bind}"
path: "/ingest"
transforms:
- id: pass_through
inputs: [webhook]
outputs: [out]
sinks:
- id: out
type: file
config:
path: "{output_path}"
"#,
bind = bind,
output_path = output_path.display()
);
let (shutdown_tx, handle) = harness.run_pipeline(&config).await;
let payload = json!({"hello": "world"});
let response = post_json_with_retry(&url, &payload, None).await;
assert!(response.status().is_success());
let content = wait_for_file_content(&output_path, default_test_timeout()).await;
let line = content.lines().next().expect("Expected a line in output");
let message: pipeflow::Message = serde_json::from_str(line).expect("Invalid JSON output");
assert_eq!(message.meta.source_node, "webhook");
assert_eq!(message.payload, payload);
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(default_test_timeout(), handle).await;
}
#[tokio::test]
async fn test_http_server_source_bearer_auth() {
if common::skip_if_no_network() {
return;
}
let harness = TestHarness::new().await;
let output_path = harness.output_path("http_server_auth.jsonl");
let port = allocate_local_port();
let bind = format!("127.0.0.1:{}", port);
let url = format!("http://{}/secure", bind);
let config = format!(
r#"
pipeline:
sources:
- id: webhook
type: http_server
config:
bind: "{bind}"
path: "/secure"
auth:
type: bearer
token: "secret-token"
transforms:
- id: pass_through
inputs: [webhook]
outputs: [out]
sinks:
- id: out
type: file
config:
path: "{output_path}"
"#,
bind = bind,
output_path = output_path.display()
);
let (shutdown_tx, handle) = harness.run_pipeline(&config).await;
let payload = json!({"hello": "secure"});
let response = post_json_with_retry(&url, &payload, None).await;
assert_eq!(response.status(), reqwest::StatusCode::UNAUTHORIZED);
let response = post_json_with_retry(&url, &payload, Some("secret-token")).await;
assert!(response.status().is_success());
let content = wait_for_file_content(&output_path, default_test_timeout()).await;
let line = content.lines().next().expect("Expected a line in output");
let message: pipeflow::Message = serde_json::from_str(line).expect("Invalid JSON output");
assert_eq!(message.meta.source_node, "webhook");
assert_eq!(message.payload, payload);
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(default_test_timeout(), handle).await;
}