#![cfg(all(feature = "notify", feature = "http-server"))]
mod common;
use common::{
TestHarness, allocate_local_port, default_test_timeout, post_json_with_retry,
skip_if_no_network, wait_for_requests,
};
use pipeflow::common::types::{Notify, Severity};
use serde_json::Value;
use std::time::Duration;
use wiremock::matchers::{method, path};
use wiremock::{Mock, ResponseTemplate};
#[tokio::test]
async fn notify_telegram_e2e() {
if skip_if_no_network() {
return;
}
let harness = TestHarness::new().await;
let server = &harness.mock_server;
Mock::given(method("POST"))
.and(path("/botTEST_TOKEN/sendMessage"))
.respond_with(ResponseTemplate::new(200))
.mount(server)
.await;
let port = allocate_local_port();
let yaml = format!(
r#"
pipeline:
sources:
- id: http_source
type: http_server
config:
bind: "127.0.0.1:{port}"
transforms:
- id: passthrough
inputs: [http_source]
outputs: [notify_telegram]
steps: []
sinks:
- id: notify_telegram
type: notify
config:
provider: telegram
bot_token: "TEST_TOKEN"
chat_id: "12345"
api_base_url: "{}"
parse_mode: "HTML"
disable_web_page_preview: true
message: "Alert: {{{{ $.message }}}}"
"#,
server.uri()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let notify = Notify::new("service_down", Severity::Critical, "db unreachable");
let payload = serde_json::to_value(¬ify).expect("Failed to serialize notify");
post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
wait_for_requests(server, 1, default_test_timeout()).await;
let requests = server
.received_requests()
.await
.expect("Failed to read requests");
assert!(
!requests.is_empty(),
"Expected at least one telegram request"
);
let body: Value =
serde_json::from_slice(&requests[0].body).expect("Telegram body should be JSON");
assert_eq!(body["chat_id"], "12345");
assert_eq!(body["text"], "Alert: db unreachable");
assert_eq!(body["parse_mode"], "HTML");
assert_eq!(body["disable_web_page_preview"], true);
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}
#[tokio::test]
async fn notify_webhook_e2e() {
if skip_if_no_network() {
return;
}
let harness = TestHarness::new().await;
let server = &harness.mock_server;
Mock::given(method("POST"))
.and(path("/notify"))
.respond_with(ResponseTemplate::new(200))
.mount(server)
.await;
let port = allocate_local_port();
let yaml = format!(
r#"
pipeline:
sources:
- id: http_source
type: http_server
config:
bind: "127.0.0.1:{port}"
transforms:
- id: passthrough
inputs: [http_source]
outputs: [notify_webhook]
steps: []
sinks:
- id: notify_webhook
type: notify
config:
provider: webhook
url: "{}/notify"
body: full
message: "Alert: {{{{ $.message }}}}"
"#,
server.uri()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let notify = Notify::new("disk_full", Severity::Error, "disk at 95%");
let payload = serde_json::to_value(¬ify).expect("Failed to serialize notify");
post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
wait_for_requests(server, 1, default_test_timeout()).await;
let requests = server
.received_requests()
.await
.expect("Failed to read requests");
assert!(
!requests.is_empty(),
"Expected at least one webhook request"
);
let body: Value =
serde_json::from_slice(&requests[0].body).expect("Webhook body should be JSON");
assert_eq!(body["payload"]["message"], "disk at 95%");
assert_eq!(body["text"], "Alert: disk at 95%");
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}
#[cfg(feature = "redis")]
#[tokio::test]
async fn notify_silence_redis_e2e() {
if skip_if_no_network() {
return;
}
let redis_url = match std::env::var("REDIS_URL") {
Ok(url) => url,
Err(_) => {
eprintln!("REDIS_URL not set; skipping redis notify silence test");
return;
}
};
let harness = TestHarness::new().await;
let server = &harness.mock_server;
Mock::given(method("POST"))
.and(path("/notify"))
.respond_with(ResponseTemplate::new(200))
.mount(server)
.await;
let key_prefix = format!("pipeflow:test:silence:{}:", uuid::Uuid::now_v7());
let port = allocate_local_port();
let yaml = format!(
r#"
system:
notify:
silence:
window: 1s
backend: redis
key: "{{{{ $.name }}}}"
redis:
url: "{redis_url}"
key_prefix: "{key_prefix}"
pipeline:
sources:
- id: http_source
type: http_server
config:
bind: "127.0.0.1:{port}"
transforms:
- id: passthrough
inputs: [http_source]
outputs: [notify_webhook]
steps: []
sinks:
- id: notify_webhook
type: notify
config:
provider: webhook
url: "{}/notify"
body: full
"#,
server.uri()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let notify = Notify::new("price_alert", Severity::Warning, "price high");
let payload = serde_json::to_value(notify.clone()).expect("Failed to serialize notify");
post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
wait_for_requests(server, 1, default_test_timeout()).await;
post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let received = server
.received_requests()
.await
.expect("Failed to read requests")
.len();
assert_eq!(received, 1, "expected silence to suppress duplicate");
tokio::time::sleep(Duration::from_secs(2)).await;
post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
wait_for_requests(server, 2, default_test_timeout()).await;
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}
#[tokio::test]
async fn notify_active_window_bypass_severity_e2e() {
if skip_if_no_network() {
return;
}
let harness = TestHarness::new().await;
let server = &harness.mock_server;
Mock::given(method("POST"))
.and(path("/notify"))
.respond_with(ResponseTemplate::new(200))
.mount(server)
.await;
let port = allocate_local_port();
let yaml = format!(
r#"
system:
notify:
active_window:
start: "00:00"
end: "00:01"
bypass_severity: warning
pipeline:
sources:
- id: http_source
type: http_server
config:
bind: "127.0.0.1:{port}"
transforms:
- id: passthrough
inputs: [http_source]
outputs: [notify_webhook]
steps: []
sinks:
- id: notify_webhook
type: notify
config:
provider: webhook
url: "{}/notify"
body: full
"#,
server.uri()
);
let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;
let notify = Notify::new("test_alert", Severity::Warning, "bypass test");
let payload = serde_json::to_value(¬ify).expect("Failed to serialize notify");
post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
wait_for_requests(server, 1, default_test_timeout()).await;
let requests = server
.received_requests()
.await
.expect("Failed to read requests");
assert_eq!(
requests.len(),
1,
"Warning alert should bypass the active window"
);
let _ = shutdown_tx.send(());
let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}