pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! End-to-end tests for notify sink providers (Telegram, Webhook, Email).

#![cfg(all(feature = "notify", feature = "http-server"))]

mod common;

use common::{
    TestHarness, allocate_local_port, default_test_timeout, post_json_with_retry,
    skip_if_no_network, wait_for_requests,
};
use pipeflow::common::types::{Notify, Severity};
use serde_json::Value;
use std::time::Duration;
use wiremock::matchers::{method, path};
use wiremock::{Mock, ResponseTemplate};

/// Test Telegram notify provider sends correctly formatted messages.
#[tokio::test]
async fn notify_telegram_e2e() {
    if skip_if_no_network() {
        return;
    }

    let harness = TestHarness::new().await;
    let server = &harness.mock_server;

    Mock::given(method("POST"))
        .and(path("/botTEST_TOKEN/sendMessage"))
        .respond_with(ResponseTemplate::new(200))
        .mount(server)
        .await;

    let port = allocate_local_port();
    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: http_source
      type: http_server
      config:
        bind: "127.0.0.1:{port}"

  transforms:
    - id: passthrough
      inputs: [http_source]
      outputs: [notify_telegram]
      steps: []

  sinks:
    - id: notify_telegram
      type: notify
      config:
        provider: telegram
        bot_token: "TEST_TOKEN"
        chat_id: "12345"
        api_base_url: "{}"
        parse_mode: "HTML"
        disable_web_page_preview: true
        message: "Alert: {{{{ $.message }}}}"
"#,
        server.uri()
    );

    let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;

    // Construct Notify object
    let notify = Notify::new("service_down", Severity::Critical, "db unreachable");
    let payload = serde_json::to_value(&notify).expect("Failed to serialize notify");

    // Send to http source
    post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;

    wait_for_requests(server, 1, default_test_timeout()).await;
    let requests = server
        .received_requests()
        .await
        .expect("Failed to read requests");
    assert!(
        !requests.is_empty(),
        "Expected at least one telegram request"
    );

    let body: Value =
        serde_json::from_slice(&requests[0].body).expect("Telegram body should be JSON");
    assert_eq!(body["chat_id"], "12345");
    assert_eq!(body["text"], "Alert: db unreachable");
    assert_eq!(body["parse_mode"], "HTML");
    assert_eq!(body["disable_web_page_preview"], true);

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}

/// Test Webhook notify provider sends correctly formatted messages.
#[tokio::test]
async fn notify_webhook_e2e() {
    if skip_if_no_network() {
        return;
    }

    let harness = TestHarness::new().await;
    let server = &harness.mock_server;

    Mock::given(method("POST"))
        .and(path("/notify"))
        .respond_with(ResponseTemplate::new(200))
        .mount(server)
        .await;

    let port = allocate_local_port();
    let yaml = format!(
        r#"
pipeline:
  sources:
    - id: http_source
      type: http_server
      config:
        bind: "127.0.0.1:{port}"

  transforms:
    - id: passthrough
      inputs: [http_source]
      outputs: [notify_webhook]
      steps: []

  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "{}/notify"
        body: full
        message: "Alert: {{{{ $.message }}}}"
"#,
        server.uri()
    );

    let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;

    let notify = Notify::new("disk_full", Severity::Error, "disk at 95%");
    let payload = serde_json::to_value(&notify).expect("Failed to serialize notify");

    post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;

    wait_for_requests(server, 1, default_test_timeout()).await;
    let requests = server
        .received_requests()
        .await
        .expect("Failed to read requests");
    assert!(
        !requests.is_empty(),
        "Expected at least one webhook request"
    );

    let body: Value =
        serde_json::from_slice(&requests[0].body).expect("Webhook body should be JSON");
    assert_eq!(body["payload"]["message"], "disk at 95%");
    assert_eq!(body["text"], "Alert: disk at 95%");

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}

/// Test Redis-backed silence suppresses duplicate notifications within the window.
#[cfg(feature = "redis")]
#[tokio::test]
async fn notify_silence_redis_e2e() {
    if skip_if_no_network() {
        return;
    }

    let redis_url = match std::env::var("REDIS_URL") {
        Ok(url) => url,
        Err(_) => {
            eprintln!("REDIS_URL not set; skipping redis notify silence test");
            return;
        }
    };

    let harness = TestHarness::new().await;
    let server = &harness.mock_server;

    Mock::given(method("POST"))
        .and(path("/notify"))
        .respond_with(ResponseTemplate::new(200))
        .mount(server)
        .await;

    let key_prefix = format!("pipeflow:test:silence:{}:", uuid::Uuid::now_v7());
    let port = allocate_local_port();
    let yaml = format!(
        r#"
system:
  notify:
    silence:
      window: 1s
      backend: redis
      key: "{{{{ $.name }}}}"
      redis:
        url: "{redis_url}"
        key_prefix: "{key_prefix}"

pipeline:
  sources:
    - id: http_source
      type: http_server
      config:
        bind: "127.0.0.1:{port}"

  transforms:
    - id: passthrough
      inputs: [http_source]
      outputs: [notify_webhook]
      steps: []

  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "{}/notify"
        body: full
"#,
        server.uri()
    );

    let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;

    let notify = Notify::new("price_alert", Severity::Warning, "price high");
    let payload = serde_json::to_value(notify.clone()).expect("Failed to serialize notify");

    post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
    wait_for_requests(server, 1, default_test_timeout()).await;

    post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
    tokio::time::sleep(Duration::from_millis(200)).await;
    let received = server
        .received_requests()
        .await
        .expect("Failed to read requests")
        .len();
    assert_eq!(received, 1, "expected silence to suppress duplicate");

    tokio::time::sleep(Duration::from_secs(2)).await;
    post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;
    wait_for_requests(server, 2, default_test_timeout()).await;

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}

/// Test active_window bypass_severity allows high-severity alerts through immediately.
#[tokio::test]
async fn notify_active_window_bypass_severity_e2e() {
    if skip_if_no_network() {
        return;
    }

    let harness = TestHarness::new().await;
    let server = &harness.mock_server;

    Mock::given(method("POST"))
        .and(path("/notify"))
        .respond_with(ResponseTemplate::new(200))
        .mount(server)
        .await;

    let port = allocate_local_port();
    // Configure active_window with a window that is never active (00:00-00:01)
    // but bypass_severity=warning so Warning+ alerts go through immediately
    let yaml = format!(
        r#"
system:
  notify:
    active_window:
      start: "00:00"
      end: "00:01"
      bypass_severity: warning

pipeline:
  sources:
    - id: http_source
      type: http_server
      config:
        bind: "127.0.0.1:{port}"

  transforms:
    - id: passthrough
      inputs: [http_source]
      outputs: [notify_webhook]
      steps: []

  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "{}/notify"
        body: full
"#,
        server.uri()
    );

    let (shutdown_tx, engine_handle) = harness.run_pipeline(&yaml).await;

    // Send a Warning-level alert (should bypass the window)
    let notify = Notify::new("test_alert", Severity::Warning, "bypass test");
    let payload = serde_json::to_value(&notify).expect("Failed to serialize notify");

    post_json_with_retry(&format!("http://127.0.0.1:{port}"), &payload, None).await;

    wait_for_requests(server, 1, default_test_timeout()).await;
    let requests = server
        .received_requests()
        .await
        .expect("Failed to read requests");
    assert_eq!(
        requests.len(),
        1,
        "Warning alert should bypass the active window"
    );

    let _ = shutdown_tx.send(());
    let _ = tokio::time::timeout(default_test_timeout(), engine_handle).await;
}