#![allow(dead_code, unused)]
use mq_bridge::models::{Endpoint, EndpointType, HttpConfig, Route};
use mq_bridge::test_utils::{setup_logging, PERF_TEST_MESSAGE_COUNT};
use serde_yaml_ng;
use std::collections::HashMap;
use std::time::{Duration, Instant};
const CONFIG_YAML: &str = r#"
routes:
memory_to_http:
concurrency: 4
batch_size: 128
input:
memory: { topic: "test-in-http" }
output:
http:
url: "http://127.0.0.1:{out_port}"
request_timeout_ms: 5000
batch_concurrency: 4
http_to_memory:
concurrency: 4
batch_size: 128
input:
http:
url: "127.0.0.1:{out_port}"
internal_buffer_size: {buffer_size}
fire_and_forget: false
output:
memory: { topic: "test-out-http", capacity: {out_capacity} }
"#;
fn get_free_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
listener.local_addr().unwrap().port()
}
async fn wait_for_server_ready(addr: &str, timeout: Duration) -> bool {
let start = Instant::now();
while start.elapsed() < timeout {
if tokio::net::TcpStream::connect(addr).await.is_ok() {
return true;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
false
}
pub async fn test_http_performance_pipeline() {
setup_logging();
tokio::time::timeout(Duration::from_secs(60), async {
let in_route_name = "memory_to_http".to_string();
let out_route_name = "http_to_memory".to_string();
let mut deployed_out_route: Option<Route> = None;
let mut deployed_in_route: Option<Route> = None;
for _attempt in 0..5 {
let port = get_free_port();
let config_yaml = CONFIG_YAML
.replace("{out_port}", &port.to_string())
.replace(
"{out_capacity}",
&(PERF_TEST_MESSAGE_COUNT + 1000).to_string(),
)
.replace("{buffer_size}", &(PERF_TEST_MESSAGE_COUNT * 2).to_string());
let yaml_val: serde_yaml_ng::Value =
serde_yaml_ng::from_str(&config_yaml).expect("Failed to parse YAML config");
let routes_val = yaml_val.get("routes").expect("YAML must have 'routes' key");
let routes: HashMap<String, Route> =
serde_yaml_ng::from_value(routes_val.clone()).expect("Failed to parse routes");
let in_route = routes[&in_route_name].clone();
let out_route = routes[&out_route_name].clone();
match out_route.deploy(&out_route_name).await {
Ok(_) => {
let addr = format!("127.0.0.1:{}", port);
if wait_for_server_ready(&addr, Duration::from_secs(5)).await {
deployed_out_route = Some(out_route);
deployed_in_route = Some(in_route);
break;
} else {
let _ = mq_bridge::Route::stop(&out_route_name).await;
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
}
Err(e) => {
eprintln!("Failed to deploy http consumer on port {}: {}", port, e);
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
}
}
let in_route = deployed_in_route.expect("Failed to deploy routes after retries");
let out_route = deployed_out_route.expect("Failed to deploy http consumer after retries");
let memory_channel = out_route.output.channel().unwrap();
in_route
.deploy(&in_route_name)
.await
.expect("Failed to deploy memory_to_http route");
let in_channel = in_route.input.channel().unwrap();
let messages = mq_bridge::test_utils::generate_test_messages(PERF_TEST_MESSAGE_COUNT);
in_channel.fill_messages(messages).await.unwrap();
let deadline = Duration::from_secs(45);
let start = Instant::now();
let mut last_log = Instant::now();
let mut received = 0usize;
while start.elapsed() < deadline {
let batch = memory_channel.drain_messages();
if !batch.is_empty() {
received += batch.len();
}
if received >= PERF_TEST_MESSAGE_COUNT {
break;
}
if last_log.elapsed() >= Duration::from_secs(5) {
println!(
"Progress: {}/{} messages received",
received, PERF_TEST_MESSAGE_COUNT
);
last_log = Instant::now();
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
mq_bridge::Route::stop(&in_route_name).await;
mq_bridge::Route::stop(&out_route_name).await;
assert_eq!(
received, PERF_TEST_MESSAGE_COUNT,
"Expected {} messages, received {}",
PERF_TEST_MESSAGE_COUNT, received
);
})
.await
.expect("HTTP pipeline test timed out");
}
#[cfg(feature = "http")]
#[tokio::test(flavor = "multi_thread")]
async fn test_http_concurrency() {
setup_logging();
let port = get_free_port();
let input = Endpoint::new(EndpointType::Http(HttpConfig {
url: format!("127.0.0.1:{}", port),
..Default::default()
}));
let sender = Endpoint::new(EndpointType::Http(HttpConfig {
url: format!("http://127.0.0.1:{}", port),
..Default::default()
}));
let output = Endpoint::new_memory("con_out_http", 10);
mq_bridge::test_utils::run_concurrency_test(input, output, sender).await;
}