mod common;
use common::{default_test_timeout, run_engine, wait_for_file_content};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(all(feature = "file", feature = "http-client"))]
async fn test_sink_failure_routes_to_dlq() {
if common::skip_if_no_network() {
return;
}
let source_server = MockServer::start().await;
let sink_server = MockServer::start().await;
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let dlq_path = temp_dir.path().join("dlq.jsonl");
Mock::given(method("GET"))
.and(path("/data"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "bad_sink",
"value": 7
})))
.expect(1..)
.mount(&source_server)
.await;
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(ResponseTemplate::new(500).set_body_string("Sink error"))
.expect(1..)
.mount(&sink_server)
.await;
let yaml = format!(
r#"
system:
sinks:
dlq: "{dlq_path}"
pipeline:
sources:
- id: src
type: http_client
config:
url: "{source_url}/data"
interval: "100ms"
transforms:
- id: pass
inputs: [src]
outputs: [sink]
sinks:
- id: sink
type: http_client
config:
url: "{sink_url}/webhook"
method: POST
"#,
dlq_path = dlq_path.display(),
source_url = source_server.uri(),
sink_url = sink_server.uri()
);
let (shutdown_tx, engine_handle) = run_engine(&yaml).await;
let content = wait_for_file_content(&dlq_path, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(result.is_ok(), "Engine should run despite sink failures");
let line = content.lines().next().expect("Expected a DLQ line");
let parsed: serde_json::Value = serde_json::from_str(line).expect("Valid DLQ JSON");
let original_payload = &parsed["payload"]["payload"];
assert_eq!(original_payload["status"], "bad_sink");
assert_eq!(original_payload["value"], 7);
}