#![cfg(feature = "daemon")]
mod common;
use std::time::Duration;
use common::{DaemonProcess, SIMPLE_RULE, http_post, poll_until, temp_file};
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, ResponseTemplate};
const MATCHING_EVENT: &str = r#"{"CommandLine":"run malware.exe"}"#;
const WEBHOOK_CFG: &str = r#"
webhooks:
- id: slack
kind: detection
url: __URL__/hook
headers:
X-Source: rsigma
body: '{"text":"${detection.rule.title}: ${detection.fields.CommandLine}"}'
"#;
const WEBHOOK_AUTH_CFG: &str = r#"
webhooks:
- id: slack
kind: detection
url: __URL__/hook
headers:
Authorization: "Bearer ${RSIGMA_TEST_WEBHOOK_TOKEN}"
body: '{"text":"${detection.rule.title}"}'
"#;
const WEBHOOK_SCOPED_CFG: &str = r#"
webhooks:
- id: slack
kind: detection
url: __URL__/hook
body: '{"text":"${detection.rule.title}"}'
scope:
levels: [critical]
"#;
const TLS_CFG: &str = r#"
webhooks:
- id: internal
kind: detection
url: https://127.0.0.1:1/hook
body: '{"text":"${detection.rule.title}"}'
retry:
attempts: 1
tls:
ca: __CA__
"#;
const WEBHOOK_RETRY_CFG: &str = r#"
webhooks:
- id: slack
kind: detection
url: __URL__/hook
body: '{"text":"${detection.rule.title}"}'
retry:
attempts: 3
backoff: 100ms
max_backoff: 300ms
"#;
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
fn mock_server(rt: &tokio::runtime::Runtime, status: u16) -> MockServer {
rt.block_on(async {
let s = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(status))
.mount(&s)
.await;
s
})
}
fn request_count(rt: &tokio::runtime::Runtime, server: &MockServer) -> usize {
rt.block_on(server.received_requests())
.map(|r| r.len())
.unwrap_or(0)
}
#[test]
fn webhook_delivers_rendered_body_and_headers() {
let rt = rt();
let server = mock_server(&rt, 200);
let webhook = temp_file(".yml", &WEBHOOK_CFG.replace("__URL__", &server.uri()));
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http_with_args(
rule.path().to_str().unwrap(),
&["--webhook", webhook.path().to_str().unwrap()],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let req = poll_until(Duration::from_secs(5), || {
rt.block_on(server.received_requests())
.unwrap_or_default()
.into_iter()
.next()
})
.expect("webhook never made a request");
assert_eq!(
req.headers.get("x-source").and_then(|v| v.to_str().ok()),
Some("rsigma"),
"the static header should be sent",
);
let body: serde_json::Value =
serde_json::from_slice(&req.body).expect("rendered body must be valid JSON");
assert_eq!(
body["text"], "Test Rule: run malware.exe",
"the body should interpolate the rule title and matched field",
);
}
#[test]
fn webhook_permanent_4xx_routes_to_dlq() {
let rt = rt();
let server = mock_server(&rt, 400);
let webhook = temp_file(".yml", &WEBHOOK_CFG.replace("__URL__", &server.uri()));
let rule = temp_file(".yml", SIMPLE_RULE);
let dlq = temp_file(".ndjson", "");
let dlq_spec = format!("file://{}", dlq.path().display());
let daemon = DaemonProcess::spawn_http_with_args(
rule.path().to_str().unwrap(),
&[
"--webhook",
webhook.path().to_str().unwrap(),
"--dlq",
&dlq_spec,
],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let dlqd = poll_until(Duration::from_secs(10), || {
let s = std::fs::read_to_string(dlq.path()).unwrap_or_default();
(s.contains("webhook slack") && s.contains("HTTP 400")).then_some(())
});
assert!(
dlqd.is_some(),
"a permanent 4xx should route to the DLQ with the webhook id and status",
);
assert_eq!(
request_count(&rt, &server),
1,
"permanent 4xx must not retry"
);
}
#[test]
fn webhook_retries_then_exhausts_to_dlq() {
let rt = rt();
let server = mock_server(&rt, 500);
let webhook = temp_file(".yml", &WEBHOOK_RETRY_CFG.replace("__URL__", &server.uri()));
let rule = temp_file(".yml", SIMPLE_RULE);
let dlq = temp_file(".ndjson", "");
let dlq_spec = format!("file://{}", dlq.path().display());
let daemon = DaemonProcess::spawn_http_with_args(
rule.path().to_str().unwrap(),
&[
"--webhook",
webhook.path().to_str().unwrap(),
"--dlq",
&dlq_spec,
],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let dlqd = poll_until(Duration::from_secs(10), || {
let s = std::fs::read_to_string(dlq.path()).unwrap_or_default();
(s.contains("webhook slack") && s.contains("HTTP 500")).then_some(())
});
assert!(
dlqd.is_some(),
"a retryable 5xx should exhaust retries into the DLQ"
);
assert!(
request_count(&rt, &server) >= 2,
"the webhook should have retried at least once before the DLQ",
);
}
#[test]
fn webhook_scope_filter_skips_out_of_scope() {
let rt = rt();
let server = mock_server(&rt, 200);
let webhook = temp_file(
".yml",
&WEBHOOK_SCOPED_CFG.replace("__URL__", &server.uri()),
);
let rule = temp_file(".yml", SIMPLE_RULE);
let out = temp_file(".ndjson", "");
let out_spec = format!("file://{}", out.path().display());
let daemon = DaemonProcess::spawn_http_with_args(
rule.path().to_str().unwrap(),
&[
"--webhook",
webhook.path().to_str().unwrap(),
"--output",
&out_spec,
],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let landed = poll_until(Duration::from_secs(5), || {
std::fs::read_to_string(out.path())
.unwrap_or_default()
.contains("Test Rule")
.then_some(())
});
assert!(landed.is_some(), "detection should reach the file sink");
std::thread::sleep(Duration::from_millis(300));
assert_eq!(
request_count(&rt, &server),
0,
"a webhook scoped to critical must not fire for a high-severity detection",
);
}
#[test]
fn webhook_env_header_is_interpolated() {
let rt = rt();
let server = mock_server(&rt, 200);
let webhook = temp_file(".yml", &WEBHOOK_AUTH_CFG.replace("__URL__", &server.uri()));
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http_with_args_env(
rule.path().to_str().unwrap(),
&["--webhook", webhook.path().to_str().unwrap()],
&[("RSIGMA_TEST_WEBHOOK_TOKEN", "secret-xyz")],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let req = poll_until(Duration::from_secs(5), || {
rt.block_on(server.received_requests())
.unwrap_or_default()
.into_iter()
.next()
})
.expect("webhook never made a request");
assert_eq!(
req.headers
.get("authorization")
.and_then(|v| v.to_str().ok()),
Some("Bearer secret-xyz"),
"the env-var header secret should be interpolated at render time",
);
}
#[test]
fn webhook_tls_with_ca_routes_unreachable_to_dlq() {
use rcgen::{BasicConstraints, CertificateParams, IsCa, KeyPair};
let mut ca_params = CertificateParams::new(Vec::<String>::new()).unwrap();
ca_params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
let ca_key = KeyPair::generate().unwrap();
let ca_pem = ca_params.self_signed(&ca_key).unwrap().pem();
let ca = temp_file(".pem", &ca_pem);
let cfg = TLS_CFG.replace("__CA__", &ca.path().display().to_string());
let webhook = temp_file(".yml", &cfg);
let rule = temp_file(".yml", SIMPLE_RULE);
let dlq = temp_file(".ndjson", "");
let dlq_spec = format!("file://{}", dlq.path().display());
let daemon = DaemonProcess::spawn_http_with_args(
rule.path().to_str().unwrap(),
&[
"--webhook",
webhook.path().to_str().unwrap(),
"--dlq",
&dlq_spec,
],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let dlqd = poll_until(Duration::from_secs(10), || {
std::fs::read_to_string(dlq.path())
.unwrap_or_default()
.contains("webhook internal")
.then_some(())
});
assert!(
dlqd.is_some(),
"an https webhook with a custom CA should build TLS and route the unreachable delivery to the DLQ",
);
}
#[test]
fn webhook_does_not_block_file_sink_under_fanout() {
let webhook = temp_file(
".yml",
&WEBHOOK_CFG.replace("__URL__", "http://127.0.0.1:1"),
);
let rule = temp_file(".yml", SIMPLE_RULE);
let out = temp_file(".ndjson", "");
let out_spec = format!("file://{}", out.path().display());
let daemon = DaemonProcess::spawn_http_with_args(
rule.path().to_str().unwrap(),
&[
"--webhook",
webhook.path().to_str().unwrap(),
"--output",
&out_spec,
],
);
let (status, _) = http_post(&daemon.url("/api/v1/events"), MATCHING_EVENT);
assert_eq!(status, 200);
let landed = poll_until(Duration::from_secs(5), || {
std::fs::read_to_string(out.path())
.unwrap_or_default()
.contains("Test Rule")
.then_some(())
});
assert!(
landed.is_some(),
"an unreachable webhook must not block the file sink behind it",
);
}