pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#![cfg(feature = "http-client")]

use pipeflow::source::Source;

use pipeflow::source::http_client::{HttpClientConfig, HttpClientSource};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

#[tokio::test]
async fn test_interval_scheduling() {
    let mock_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/data"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "status": "ok"
        })))
        .mount(&mock_server)
        .await;

    let config = HttpClientConfig {
        url: format!("{}/data", mock_server.uri()),
        interval: Duration::from_millis(200),
        ..Default::default()
    };

    let source = HttpClientSource::new("test_interval", config).unwrap();
    let (tx, mut rx) = broadcast::channel(100);
    let (shutdown_tx, _) = broadcast::channel(1);
    let shutdown_rx = shutdown_tx.subscribe();

    let source_handle = tokio::spawn(async move {
        let sender = pipeflow::source::MessageSender::new(tx, None);
        source.run(sender, shutdown_rx).await
    });

    let mut timestamps = Vec::new();
    let _start_time = Instant::now();

    // Collect 4 messages (approx 0s, 0.2s, 0.4s, 0.6s)
    // First message is immediate
    for _ in 0..4 {
        if tokio::time::timeout(Duration::from_secs(2), rx.recv())
            .await
            .is_ok()
        {
            timestamps.push(Instant::now());
        } else {
            break;
        }
    }

    let _ = shutdown_tx.send(());
    let _ = source_handle.await;

    assert_eq!(timestamps.len(), 4, "Should receive 4 messages");

    // Check intervals
    // 0 -> 1: ~200ms
    // 1 -> 2: ~200ms
    // 2 -> 3: ~200ms
    // We allow some jitter (e.g. ±50ms)
    for i in 0..timestamps.len() - 1 {
        let diff = timestamps[i + 1].duration_since(timestamps[i]);
        println!("Interval {}: {:?}", i, diff);
        assert!(
            diff >= Duration::from_millis(150),
            "Interval too short: {:?}",
            diff
        );
        // On slow CI/local matching, it might be slightly longer, but shouldn't be too long
        assert!(
            diff <= Duration::from_millis(400),
            "Interval too long: {:?}",
            diff
        );
    }
}

#[tokio::test]
async fn test_cron_scheduling() {
    let mock_server = MockServer::start().await;

    Mock::given(method("GET"))
        .and(path("/cron"))
        .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
            "status": "ok"
        })))
        .mount(&mock_server)
        .await;

    // Schedule: every second
    let config = HttpClientConfig {
        url: format!("{}/cron", mock_server.uri()),
        schedule: Some("* * * * * *".to_string()),
        ..Default::default()
    };

    let source = HttpClientSource::new("test_cron", config).unwrap();
    let (tx, mut rx) = broadcast::channel(100);
    let (shutdown_tx, _) = broadcast::channel(1);
    let shutdown_rx = shutdown_tx.subscribe();

    let source_handle = tokio::spawn(async move {
        let sender = pipeflow::source::MessageSender::new(tx, None);
        source.run(sender, shutdown_rx).await
    });

    let mut timestamps = Vec::new();

    // Collect 3 messages
    // Cron runs at sec boundary.
    // E.g. 10:00:01, 10:00:02, 10:00:03...
    // The first run depends on when we start.
    // Wait for up to 4 seconds to catch at least 3 ticks.
    for _ in 0..3 {
        if tokio::time::timeout(Duration::from_secs(2), rx.recv())
            .await
            .is_ok()
        {
            timestamps.push(Instant::now());
        }
    }

    let _ = shutdown_tx.send(());
    let _ = source_handle.await;

    assert_eq!(timestamps.len(), 3, "Should receive 3 messages from cron");

    for i in 0..timestamps.len() - 1 {
        let diff = timestamps[i + 1].duration_since(timestamps[i]);
        println!("Cron Interval {}: {:?}", i, diff);
        // Cron guarantees approx 1 second
        assert!(
            diff >= Duration::from_millis(800),
            "Cron interval too short: {:?}",
            diff
        );
        assert!(
            diff <= Duration::from_millis(1500),
            "Cron interval too long: {:?}",
            diff
        );
    }
}