#![cfg(feature = "http-client")]
use pipeflow::source::Source;
use pipeflow::source::http_client::{HttpClientConfig, HttpClientSource};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn test_interval_scheduling() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/data"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "ok"
})))
.mount(&mock_server)
.await;
let config = HttpClientConfig {
url: format!("{}/data", mock_server.uri()),
interval: Duration::from_millis(200),
..Default::default()
};
let source = HttpClientSource::new("test_interval", config).unwrap();
let (tx, mut rx) = broadcast::channel(100);
let (shutdown_tx, _) = broadcast::channel(1);
let shutdown_rx = shutdown_tx.subscribe();
let source_handle = tokio::spawn(async move {
let sender = pipeflow::source::MessageSender::new(tx, None);
source.run(sender, shutdown_rx).await
});
let mut timestamps = Vec::new();
let _start_time = Instant::now();
for _ in 0..4 {
if tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.is_ok()
{
timestamps.push(Instant::now());
} else {
break;
}
}
let _ = shutdown_tx.send(());
let _ = source_handle.await;
assert_eq!(timestamps.len(), 4, "Should receive 4 messages");
for i in 0..timestamps.len() - 1 {
let diff = timestamps[i + 1].duration_since(timestamps[i]);
println!("Interval {}: {:?}", i, diff);
assert!(
diff >= Duration::from_millis(150),
"Interval too short: {:?}",
diff
);
assert!(
diff <= Duration::from_millis(400),
"Interval too long: {:?}",
diff
);
}
}
#[tokio::test]
async fn test_cron_scheduling() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/cron"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "ok"
})))
.mount(&mock_server)
.await;
let config = HttpClientConfig {
url: format!("{}/cron", mock_server.uri()),
schedule: Some("* * * * * *".to_string()),
..Default::default()
};
let source = HttpClientSource::new("test_cron", config).unwrap();
let (tx, mut rx) = broadcast::channel(100);
let (shutdown_tx, _) = broadcast::channel(1);
let shutdown_rx = shutdown_tx.subscribe();
let source_handle = tokio::spawn(async move {
let sender = pipeflow::source::MessageSender::new(tx, None);
source.run(sender, shutdown_rx).await
});
let mut timestamps = Vec::new();
for _ in 0..3 {
if tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.is_ok()
{
timestamps.push(Instant::now());
}
}
let _ = shutdown_tx.send(());
let _ = source_handle.await;
assert_eq!(timestamps.len(), 3, "Should receive 3 messages from cron");
for i in 0..timestamps.len() - 1 {
let diff = timestamps[i + 1].duration_since(timestamps[i]);
println!("Cron Interval {}: {:?}", i, diff);
assert!(
diff >= Duration::from_millis(800),
"Cron interval too short: {:?}",
diff
);
assert!(
diff <= Duration::from_millis(1500),
"Cron interval too long: {:?}",
diff
);
}
}