use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
use std::time::Instant;
use hashbrown::HashMap;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use crate::{Proxy, ProxyChain};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PingInfo {
pub pinged_services: HashMap<String, u64>,
pub average_ping: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChainPingInfo {
pub from_to_ping_map: HashMap<(String, String), u64>,
pub from_to_total_ping: Option<u64>,
pub from_to_average_ping: Option<u64>,
pub pinged_services: HashMap<String, u64>,
pub average_ping: Option<u64>,
}
fn default_pinged_services() -> Vec<(String, u16)> {
vec![
("cloudflare.com".to_string(), 80),
("facebook.com".to_string(), 80),
("yandex.ru".to_string(), 80),
("youtube.com".to_string(), 80),
("github.com".to_string(), 80),
("reddit.com".to_string(), 80),
]
}
pub async fn ping_proxy(proxy: &Proxy, pinged_services: Option<Vec<(String, u16)>>) -> PingInfo {
let mut ping_info = PingInfo {
pinged_services: HashMap::new(),
average_ping: None,
};
if !proxy.is_available().await {
return ping_info;
}
let mut total_pinged_services = 0;
let mut total_ping = 0;
let ps = if let Some(services) = pinged_services {
services
} else {
default_pinged_services()
};
for (service_host, service_port) in ps {
let start_time = Instant::now();
match proxy.connect(&service_host, service_port).await {
Ok(mut s) => {
let _ = s.shutdown().await;
let ping = start_time.elapsed().as_millis() as u64;
total_pinged_services += 1;
total_ping += ping as u64;
ping_info.pinged_services.insert(service_host, ping);
}
Err(_) => {}
}
}
ping_info.average_ping = Some(total_ping / total_pinged_services as u64);
ping_info
}
pub async fn ping_proxy_chain(chain: &ProxyChain, pinged_services: Option<Vec<(String, u16)>>) -> ChainPingInfo {
let mut ping_info = ChainPingInfo {
from_to_ping_map: HashMap::new(),
from_to_total_ping: None,
from_to_average_ping: None,
pinged_services: HashMap::new(),
average_ping: None,
};
let mut total_pinged_services = 0;
let mut total_ping = 0;
let ps = if let Some(services) = pinged_services {
services
} else {
default_pinged_services()
};
for (service_host, service_port) in ps {
let proxy_chain = chain.get_chain();
let first_proxy = &proxy_chain[0];
let first_addr = first_proxy.get_address();
let mut stream = match TcpStream::connect(first_addr).await {
Ok(s) => s,
Err(_) => continue,
};
let mut last_proxy_addr = first_addr;
let mut from_to_total_ping = 0;
let mut from_to_pinged = 0;
for proxy in &proxy_chain[1..] {
let proxy_ip = if let Some(ip) = proxy.get_ip() {
ip
} else {
continue;
};
let proxy_port = if let Some(port) = proxy.get_port() {
port
} else {
continue;
};
let start_time = Instant::now();
stream = match proxy.connect_with_stream(stream, proxy_ip, proxy_port).await {
Ok(s) => s,
Err(_) => return ping_info, };
let from_to = (last_proxy_addr.to_string(), proxy.get_address().to_string());
let ping = start_time.elapsed().as_millis() as u64;
ping_info.from_to_ping_map.insert(from_to, ping);
last_proxy_addr = proxy.get_address();
from_to_total_ping += ping;
from_to_pinged += 1;
}
ping_info.from_to_total_ping = Some(from_to_total_ping);
ping_info.from_to_average_ping = Some(from_to_total_ping / from_to_pinged);
let last_proxy = &proxy_chain[proxy_chain.len() - 1];
let start_time = Instant::now();
match last_proxy.connect_with_stream(stream, &service_host, service_port).await {
Ok(mut s) => {
let _ = s.shutdown().await;
let ping = start_time.elapsed().as_millis() as u64;
total_pinged_services += 1;
total_ping += ping;
ping_info.pinged_services.insert(service_host, ping);
}
Err(_) => continue,
}
}
let average_ping = total_ping / total_pinged_services;
ping_info.average_ping = Some(average_ping);
ping_info
}
pub async fn ping_proxy_parallel(proxy: Arc<Proxy>, pinged_services: Option<Vec<(String, u16)>>) -> PingInfo {
let ping_info = PingInfo {
pinged_services: HashMap::new(),
average_ping: None,
};
let ping_info_mutex = Arc::new(Mutex::new(ping_info));
if !proxy.is_available().await {
return ping_info_mutex.lock().await.clone();
}
let total_pinged_services = Arc::new(AtomicU8::new(0));
let total_ping = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
let ps = if let Some(services) = pinged_services {
services
} else {
default_pinged_services()
};
for (service_host, service_port) in ps {
let proxy_clone = Arc::clone(&proxy);
let ping_info_mutex_clone = Arc::clone(&ping_info_mutex);
let total_pinged_services_clone = Arc::clone(&total_pinged_services);
let total_ping_clone = Arc::clone(&total_ping);
let handle = tokio::spawn(async move {
let start_time = Instant::now();
match proxy_clone.connect(&service_host, service_port).await {
Ok(mut s) => {
let _ = s.shutdown().await;
let ping = start_time.elapsed().as_millis() as u64;
total_pinged_services_clone.fetch_add(1, Ordering::SeqCst);
total_ping_clone.fetch_add(ping, Ordering::SeqCst);
ping_info_mutex_clone.lock().await.pinged_services.insert(service_host, ping);
}
Err(_) => {}
}
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let average_ping = total_ping.load(Ordering::SeqCst) / total_pinged_services.load(Ordering::SeqCst) as u64;
ping_info_mutex.lock().await.average_ping = Some(average_ping);
ping_info_mutex.lock().await.clone()
}
pub async fn ping_proxy_chain_parallel(chain: Arc<ProxyChain>, pinged_services: Option<Vec<(String, u16)>>) -> ChainPingInfo {
let ping_info = ChainPingInfo {
from_to_ping_map: HashMap::new(),
from_to_total_ping: None,
from_to_average_ping: None,
pinged_services: HashMap::new(),
average_ping: None,
};
let ping_info_mutex = Arc::new(Mutex::new(ping_info));
let total_pinged_services = Arc::new(AtomicU8::new(0));
let total_ping = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
let ps = if let Some(services) = pinged_services {
services
} else {
default_pinged_services()
};
for (service_host, service_port) in ps {
let chain_clone = Arc::clone(&chain);
let ping_info_mutex_clone = Arc::clone(&ping_info_mutex);
let total_pinged_services_clone = Arc::clone(&total_pinged_services);
let total_ping_clone = Arc::clone(&total_ping);
let handle = tokio::spawn(async move {
let proxy_chain = chain_clone.get_chain();
let first_proxy = &proxy_chain[0];
let first_addr = first_proxy.get_address();
let mut stream = match TcpStream::connect(first_addr).await {
Ok(s) => s,
Err(_) => return,
};
let mut last_proxy_addr = first_addr;
let mut from_to_total_ping = 0;
let mut from_to_pinged = 0;
for proxy in &proxy_chain[1..] {
let proxy_ip = if let Some(ip) = proxy.get_ip() {
ip
} else {
return;
};
let proxy_port = if let Some(port) = proxy.get_port() {
port
} else {
return;
};
let start_time = Instant::now();
stream = match proxy.connect_with_stream(stream, proxy_ip, proxy_port).await {
Ok(s) => s,
Err(_) => return,
};
let from_to = (last_proxy_addr.to_string(), proxy.get_address().to_string());
let ping = start_time.elapsed().as_millis() as u64;
ping_info_mutex_clone.lock().await.from_to_ping_map.insert(from_to, ping);
last_proxy_addr = proxy.get_address();
from_to_total_ping += ping;
from_to_pinged += 1;
}
ping_info_mutex_clone.lock().await.from_to_total_ping = Some(from_to_total_ping);
ping_info_mutex_clone.lock().await.from_to_average_ping = Some(from_to_total_ping / from_to_pinged);
let last_proxy = &proxy_chain[proxy_chain.len() - 1];
let start_time = Instant::now();
match last_proxy.connect_with_stream(stream, &service_host, service_port).await {
Ok(mut s) => {
let _ = s.shutdown().await;
let ping = start_time.elapsed().as_millis();
total_pinged_services_clone.fetch_add(1, Ordering::SeqCst);
total_ping_clone.fetch_add(ping as u64, Ordering::SeqCst);
ping_info_mutex_clone.lock().await.pinged_services.insert(service_host, ping as u64);
}
Err(_) => return,
}
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let average_ping = total_ping.load(Ordering::SeqCst) / total_pinged_services.load(Ordering::SeqCst) as u64;
ping_info_mutex.lock().await.average_ping = Some(average_ping);
ping_info_mutex.lock().await.clone()
}