use dotenvy::dotenv;
use kode_bridge::{ClientConfig, IpcHttpClient, KodeBridgeError, Result};
use serde::{Deserialize, Serialize};
use std::env;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Deserialize, Serialize)]
pub struct UpdateProxyPayload {
pub name: String,
}
async fn concurrent_proxy_test(
client: Arc<IpcHttpClient>,
proxy_group: &str,
proxies: Vec<&str>,
) -> Result<()> {
println!(
"🚀 Starting concurrent proxy testing for group: {}",
proxy_group
);
println!("📊 Testing {} proxies concurrently", proxies.len());
let start_time = Instant::now();
let mut tasks = Vec::new();
for (index, proxy_name) in proxies.iter().enumerate() {
let client = client.clone();
let proxy_group = proxy_group.to_string();
let proxy_name = proxy_name.to_string();
let task = tokio::spawn(async move {
let task_start = Instant::now();
println!(" 🔄 Task {}: Testing proxy '{}'", index + 1, proxy_name);
let result = test_proxy_switch(&client, &proxy_group, &proxy_name).await;
let duration = task_start.elapsed();
let success_status = match &result {
Ok(success) => {
if *success {
println!(
" ✅ Task {}: Proxy '{}' switched successfully in {:?}",
index + 1,
proxy_name,
duration
);
"success"
} else {
println!(
" ⚠️ Task {}: Proxy '{}' switch failed in {:?}",
index + 1,
proxy_name,
duration
);
"failed"
}
}
Err(e) => {
println!(
" ❌ Task {}: Proxy '{}' error in {:?}: {}",
index + 1,
proxy_name,
duration,
e
);
"error"
}
};
(index, proxy_name, duration, success_status, result.is_ok())
});
tasks.push(task);
}
let mut results = Vec::new();
for task in tasks {
match task.await {
Ok(result) => results.push(result),
Err(e) => println!(" ❌ Task join error: {}", e),
}
}
let total_duration = start_time.elapsed();
let successful_count = results
.iter()
.filter(|(_, _, _, status, _)| *status == "success")
.count();
let failed_count = results.iter().filter(|(_, _, _, _, ok)| !ok).count();
let timeout_count = results
.iter()
.filter(|(_, _, _, status, _)| *status == "error")
.count();
println!();
println!("📊 Concurrent Test Results:");
println!(" 🎯 Total proxies tested: {}", proxies.len());
println!(" ✅ Successful switches: {}", successful_count);
println!(" ❌ Failed requests: {}", failed_count);
println!(" ⏰ Timeout errors: {}", timeout_count);
println!(" 🕐 Total time: {:?}", total_duration);
println!(
" 📈 Average time per request: {:?}",
Duration::from_millis(total_duration.as_millis() as u64 / proxies.len() as u64)
);
let mut durations: Vec<Duration> = results
.iter()
.map(|(_, _, duration, _, _)| *duration)
.collect();
durations.sort();
if !durations.is_empty() {
let min_duration = durations.first().unwrap();
let max_duration = durations.last().unwrap();
let median_duration = durations[durations.len() / 2];
println!(" 📊 Latency Analysis:");
println!(" ⚡ Fastest: {:?}", min_duration);
println!(" 🐌 Slowest: {:?}", max_duration);
println!(" 📊 Median: {:?}", median_duration);
}
let timeout_rate = timeout_count as f64 / proxies.len() as f64;
if timeout_rate > 0.3 {
println!();
println!(
"⚠️ High timeout rate detected ({:.1}%)",
timeout_rate * 100.0
);
println!("💡 Suggestions:");
println!(
" - Reduce concurrent requests (current: {})",
proxies.len()
);
println!(" - Increase timeout duration");
println!(" - Check server load and network conditions");
}
Ok(())
}
async fn test_proxy_switch(
client: &IpcHttpClient,
proxy_group: &str,
proxy_name: &str,
) -> Result<bool> {
let path = format!("/proxies/{}", proxy_group);
let payload = UpdateProxyPayload {
name: proxy_name.to_string(),
};
let response = client
.put(&path)
.json_body(&serde_json::to_value(&payload)?)
.timeout(Duration::from_secs(8)) .send()
.await?;
if response.is_success() {
tokio::time::sleep(Duration::from_millis(500)).await;
verify_proxy_switch(client, proxy_group, proxy_name).await
} else {
Ok(false)
}
}
async fn verify_proxy_switch(
client: &IpcHttpClient,
proxy_group: &str,
expected_proxy: &str,
) -> Result<bool> {
let response = client
.get("/proxies")
.timeout(Duration::from_secs(5))
.send()
.await?;
if !response.is_success() {
return Ok(false);
}
let proxy_data = response.json_value()?;
if let Some(proxies_obj) = proxy_data.get("proxies").and_then(|v| v.as_object()) {
if let Some(group_info) = proxies_obj.get(proxy_group) {
if let Some(current_proxy) = group_info.get("now").and_then(|v| v.as_str()) {
return Ok(current_proxy == expected_proxy);
}
}
}
Ok(false)
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
println!("🧪 Concurrent Proxy Testing Example");
println!("=====================================");
let ipc_path = env::var("CUSTOM_SOCK").unwrap_or_else(|_| "/tmp/example.sock".to_string());
let config = ClientConfig {
default_timeout: Duration::from_secs(8),
enable_pooling: true,
max_retries: 3,
retry_delay: Duration::from_millis(100),
max_concurrent_requests: 8, max_requests_per_second: Some(5.0), ..Default::default()
};
let client = Arc::new(IpcHttpClient::with_config(&ipc_path, config)?);
println!("📡 Fetching available proxies...");
let response = client
.get("/proxies")
.timeout(Duration::from_secs(10))
.send()
.await?;
if !response.is_success() {
return Err(KodeBridgeError::connection(format!(
"Failed to get proxies: {}",
response.status()
)));
}
let proxy_data = response.json_value()?;
if let Some(proxies_obj) = proxy_data.get("proxies").and_then(|v| v.as_object()) {
for (group_name, group_info) in proxies_obj.iter() {
if let Some(all_proxies) = group_info.get("all").and_then(|v| v.as_array()) {
if all_proxies.len() >= 3 {
let proxy_names: Vec<&str> = all_proxies
.iter()
.filter_map(|v| v.as_str())
.take(8) .collect();
if !proxy_names.is_empty() {
println!(
"🎯 Found group '{}' with {} proxies for testing",
group_name,
proxy_names.len()
);
if let Err(e) =
concurrent_proxy_test(client.clone(), group_name, proxy_names).await
{
println!("❌ Concurrent test failed: {}", e);
}
if let Some(stats) = client.pool_stats() {
println!();
println!("🔗 Connection Pool Stats: {}", stats);
}
break;
}
}
}
}
} else {
println!("⚠️ No suitable proxy groups found for testing");
}
client.close();
println!();
println!("🎯 Concurrent testing completed!");
println!("💡 Key optimizations applied:");
println!(" ✅ Connection pooling with increased size (20 connections)");
println!(" ✅ Reduced timeouts (8s request, 5s connection)");
println!(" ✅ Enhanced retry logic with exponential backoff");
println!(" ✅ Concurrent request limiting (8 max)");
println!(" ✅ Rate limiting (5 requests/sec)");
println!(" ✅ Jitter in retry delays to prevent thundering herd");
Ok(())
}