use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
use camel_api::Lifecycle;
use camel_prometheus::PrometheusService;
async fn wait_for_server(port: u16, timeout_ms: u64) -> Result<(), String> {
let start = Instant::now();
let client = reqwest::Client::new();
loop {
match client
.get(format!("http://127.0.0.1:{}/metrics", port))
.timeout(Duration::from_millis(100))
.send()
.await
{
Ok(_) => return Ok(()),
Err(_) => {
if start.elapsed().as_millis() > timeout_ms as u128 {
return Err(format!(
"Server on port {} did not start within {}ms",
port, timeout_ms
));
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
}
#[tokio::test]
async fn test_prometheus_service_lifecycle() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let mut service = PrometheusService::new(addr);
assert_eq!(service.name(), "prometheus");
let collector = service.as_metrics_collector();
assert!(collector.is_some());
let collector = collector.unwrap();
collector.increment_exchanges("test-route");
collector.increment_errors("test-route", "SomeError");
collector.set_queue_depth("test-route", 5);
service
.start()
.await
.expect("Service should start successfully");
let port = service.port();
assert!(port > 0, "Port should be assigned after start");
wait_for_server(port, 2000)
.await
.expect("Server should start within timeout");
let response = reqwest::get(&format!("http://127.0.0.1:{}/metrics", port))
.await
.expect("Failed to fetch metrics");
assert!(response.status().is_success());
let body = response.text().await.expect("Failed to read response body");
assert!(
body.contains("camel_exchanges_total"),
"Response should contain exchanges metric"
);
assert!(
body.contains("camel_errors_total"),
"Response should contain errors metric"
);
assert!(
body.contains("camel_queue_depth"),
"Response should contain queue depth metric"
);
assert!(
body.contains("test-route"),
"Response should contain route label"
);
service
.stop()
.await
.expect("Service should stop successfully");
}
#[tokio::test]
async fn test_prometheus_service_bind_error() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let mut service1 = PrometheusService::new(addr);
service1
.start()
.await
.expect("First service should start successfully");
let port = service1.port();
wait_for_server(port, 2000)
.await
.expect("First server should start");
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
let mut service2 = PrometheusService::new(addr2);
let result = service2.start().await;
assert!(
result.is_err(),
"Should fail to bind to already-used port {}",
port
);
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("AddrInUse")
|| error_msg.contains("Address already in use")
|| error_msg.contains("address already in use"),
"Error should indicate address in use: {}",
error_msg
);
service1.stop().await.expect("First service should stop");
}
#[tokio::test]
async fn test_prometheus_service_with_context() {
use camel_core::context::CamelContext;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let prometheus = PrometheusService::new(addr);
let port_accessor = prometheus.port_accessor();
let metrics = prometheus.as_metrics_collector().unwrap();
let mut ctx = CamelContext::builder()
.metrics(metrics)
.build()
.await
.unwrap()
.with_lifecycle(prometheus);
ctx.start()
.await
.expect("Context should start successfully");
let port = port_accessor.load(std::sync::atomic::Ordering::SeqCst);
assert!(port > 0, "Port should be assigned after context start");
wait_for_server(port, 2000)
.await
.expect("Server should start within timeout");
let response = reqwest::get(format!("http://127.0.0.1:{}/metrics", port))
.await
.expect("Failed to fetch metrics");
assert!(
response.status().is_success(),
"Metrics endpoint should return success"
);
ctx.stop().await.expect("Context should stop successfully");
}
#[tokio::test]
async fn test_prometheus_service_multiple_start_stop_cycles() {
for i in 0..3 {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let mut service = PrometheusService::new(addr);
service
.start()
.await
.unwrap_or_else(|_| panic!("Cycle {}: start should succeed", i));
let port = service.port();
wait_for_server(port, 2000)
.await
.unwrap_or_else(|_| panic!("Cycle {}: server should start", i));
service
.stop()
.await
.unwrap_or_else(|_| panic!("Cycle {}: stop should succeed", i));
}
}