use crate::integrations::websocket::{
CoordinationSettings, DistributedTestConfig, TestConfig, WebSocketMessage, WorkerAssignment,
};
use anyhow::Result;
use chrono::Utc;
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use uuid::Uuid;
pub struct DistributedClient {
coordinator_url: String,
}
impl DistributedClient {
pub fn new(coordinator_host: &str, coordinator_port: u16) -> Self {
Self {
coordinator_url: format!("ws://{}:{}", coordinator_host, coordinator_port),
}
}
pub async fn start_test(&self, config: DistributedTestConfig) -> Result<()> {
let (ws_stream, _) = connect_async(&self.coordinator_url).await?;
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let test_command = WebSocketMessage::TestCommand {
timestamp: Utc::now(),
command_id: Uuid::new_v4().to_string(),
command_type: crate::integrations::websocket::TestCommandType::Start,
test_config: config,
target_workers: Vec::new(), };
let json = serde_json::to_string(&test_command)?;
ws_sender
.send(tokio_tungstenite::tungstenite::protocol::Message::Text(
json.into(),
))
.await?;
let response = timeout(Duration::from_secs(10), ws_receiver.next()).await;
match response {
Ok(Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text)))) => {
if let Ok(message) = serde_json::from_str::<WebSocketMessage>(&text) {
match message {
WebSocketMessage::TestCommandResponse {
status, message, ..
} => {
println!("Test command response: {:?} - {}", status, message);
}
_ => {
println!("Unexpected response message");
}
}
}
}
_ => {
return Err(anyhow::anyhow!("Timeout waiting for test command response"));
}
}
Ok(())
}
pub async fn stop_test(&self) -> Result<()> {
let (ws_stream, _) = connect_async(&self.coordinator_url).await?;
let (mut ws_sender, _) = ws_stream.split();
let stop_command = WebSocketMessage::TestCommand {
timestamp: Utc::now(),
command_id: Uuid::new_v4().to_string(),
command_type: crate::integrations::websocket::TestCommandType::Stop,
test_config: DistributedTestConfig {
test_id: "stop".to_string(),
base_config: TestConfig {
url: "".to_string(),
concurrent_requests: 0,
rps: None,
duration_secs: 0,
total_requests: None,
method: "GET".to_string(),
user_agent_mode: "default".to_string(),
},
worker_assignments: Vec::new(),
coordination_settings: CoordinationSettings {
synchronized_start: false,
synchronized_stop: false,
sync_timeout_secs: 30,
max_sync_wait_secs: 60,
heartbeat_interval_secs: 10,
metrics_reporting_interval_secs: 5,
timeout_secs: 300,
},
},
target_workers: Vec::new(), };
let json = serde_json::to_string(&stop_command)?;
ws_sender
.send(tokio_tungstenite::tungstenite::protocol::Message::Text(
json.into(),
))
.await?;
println!("Stop command sent");
Ok(())
}
pub async fn monitor_test(&self, duration_secs: u64) -> Result<()> {
let (ws_stream, _) = connect_async(&self.coordinator_url).await?;
let (_, mut ws_receiver) = ws_stream.split();
println!(
"Monitoring distributed test for {} seconds...",
duration_secs
);
let monitor_duration = Duration::from_secs(duration_secs);
let monitor_timeout = timeout(monitor_duration, async {
while let Some(msg) = ws_receiver.next().await {
match msg {
Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text)) => {
if let Ok(message) = serde_json::from_str::<WebSocketMessage>(&text) {
match message {
WebSocketMessage::CoordinatorStatus {
connected_workers,
test_status,
..
} => {
println!(
"Coordinator status: {:?}, Workers: {}",
test_status,
connected_workers.len()
);
}
WebSocketMessage::WorkerMetrics {
worker_id, metrics, ..
} => {
println!(
"Worker {}: {} RPS, {} errors",
worker_id, metrics.current_rps, metrics.requests_failed
);
}
WebSocketMessage::TestCompleted { summary, .. } => {
println!("Test completed: {:?}", summary);
break;
}
_ => {}
}
}
}
Ok(tokio_tungstenite::tungstenite::protocol::Message::Close(_)) => {
println!("Connection closed");
break;
}
Err(e) => {
eprintln!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
});
match monitor_timeout.await {
Ok(_) => println!("Monitoring completed"),
Err(_) => println!("Monitoring timeout reached"),
}
Ok(())
}
}
pub fn create_simple_test_config(
url: String,
total_concurrent: usize,
total_rps: Option<u64>,
duration_secs: u64,
worker_ids: Vec<String>,
) -> DistributedTestConfig {
let workers_count = worker_ids.len();
let concurrent_per_worker = if workers_count > 0 {
total_concurrent / workers_count
} else {
total_concurrent
};
let rps_per_worker = total_rps.map(|rps| rps / workers_count as u64);
let worker_assignments = worker_ids
.into_iter()
.enumerate()
.map(|(i, worker_id)| WorkerAssignment {
worker_id,
concurrent_requests: concurrent_per_worker,
rps: rps_per_worker,
duration_secs: Some(duration_secs),
start_delay_secs: i as f64 * 0.1, })
.collect();
DistributedTestConfig {
test_id: Uuid::new_v4().to_string(),
base_config: TestConfig {
url,
concurrent_requests: total_concurrent,
rps: total_rps,
duration_secs,
total_requests: None,
method: "GET".to_string(),
user_agent_mode: "default".to_string(),
},
worker_assignments,
coordination_settings: CoordinationSettings {
synchronized_start: true,
synchronized_stop: true,
sync_timeout_secs: 30,
max_sync_wait_secs: 60,
heartbeat_interval_secs: 10,
metrics_reporting_interval_secs: 5,
timeout_secs: 300,
},
}
}