use dashmap::DashMap;
use std::time::Duration;
use crate::error::ScatterProxyError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProxyState {
Unknown,
Active,
Dead,
}
struct ProxyInfo {
state: ProxyState,
}
pub struct ProxyManager {
proxies: DashMap<String, ProxyInfo>,
clients: DashMap<String, reqwest::Client>,
proxy_timeout: Duration,
}
impl ProxyManager {
pub fn new(proxy_timeout: Duration) -> Self {
Self {
proxies: DashMap::new(),
clients: DashMap::new(),
proxy_timeout,
}
}
pub async fn fetch_and_add(
&self,
sources: &[String],
prefer_remote_dns: bool,
) -> Result<usize, ScatterProxyError> {
let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| ScatterProxyError::Init(format!("failed to build HTTP client: {e}")))?;
let mut added = 0usize;
for source in sources {
match http_client.get(source).send().await {
Ok(resp) => match resp.text().await {
Ok(body) => {
for line in body.lines() {
let normalized = Self::normalize_proxy_url(line, prefer_remote_dns);
if normalized.is_empty() {
continue;
}
if !self.proxies.contains_key(&normalized) {
self.proxies.insert(
normalized,
ProxyInfo {
state: ProxyState::Unknown,
},
);
added += 1;
}
}
}
Err(e) => {
tracing::error!("failed to read body from source {source}: {e}");
}
},
Err(e) => {
tracing::error!("failed to fetch proxy source {source}: {e}");
}
}
}
Ok(added)
}
pub fn all_proxy_urls(&self) -> Vec<String> {
self.proxies.iter().map(|r| r.key().clone()).collect()
}
pub fn get_active_proxies(&self) -> Vec<String> {
self.proxies
.iter()
.filter(|r| r.value().state != ProxyState::Dead)
.map(|r| r.key().clone())
.collect()
}
pub fn set_state(&self, proxy: &str, state: ProxyState) {
if let Some(mut entry) = self.proxies.get_mut(proxy) {
entry.state = state;
}
}
pub fn get_state(&self, proxy: &str) -> ProxyState {
self.proxies
.get(proxy)
.map(|r| r.value().state)
.unwrap_or(ProxyState::Unknown)
}
pub fn proxy_counts(&self) -> (usize, usize, usize, usize) {
let mut total = 0usize;
let mut active_or_unknown = 0usize;
let mut dead = 0usize;
for entry in self.proxies.iter() {
total += 1;
match entry.value().state {
ProxyState::Dead => dead += 1,
_ => active_or_unknown += 1,
}
}
(total, active_or_unknown, 0, dead)
}
pub fn get_client(&self, proxy_url: &str) -> Result<reqwest::Client, ScatterProxyError> {
if let Some(client) = self.clients.get(proxy_url) {
return Ok(client.value().clone());
}
let proxy = reqwest::Proxy::all(proxy_url).map_err(|e| {
ScatterProxyError::Init(format!("invalid proxy URL '{proxy_url}': {e}"))
})?;
let client = reqwest::Client::builder()
.proxy(proxy)
.timeout(self.proxy_timeout)
.build()
.map_err(|e| {
ScatterProxyError::Init(format!(
"failed to build client for proxy '{proxy_url}': {e}"
))
})?;
self.clients.insert(proxy_url.to_string(), client.clone());
Ok(client)
}
pub fn normalize_proxy_url(raw: &str, prefer_remote_dns: bool) -> String {
let trimmed = raw.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
return String::new();
}
if trimmed.starts_with("socks5h://") {
return trimmed.to_string();
}
if let Some(rest) = trimmed.strip_prefix("socks5://") {
if prefer_remote_dns {
return format!("socks5h://{rest}");
}
return trimmed.to_string();
}
if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
return trimmed.to_string();
}
if prefer_remote_dns {
format!("socks5h://{trimmed}")
} else {
format!("socks5://{trimmed}")
}
}
#[allow(dead_code)]
pub fn total_count(&self) -> usize {
self.proxies.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_bare_ip_port_prefer_remote_dns() {
let result = ProxyManager::normalize_proxy_url("1.2.3.4:1080", true);
assert_eq!(result, "socks5h://1.2.3.4:1080");
}
#[test]
fn normalize_bare_ip_port_local_dns() {
let result = ProxyManager::normalize_proxy_url("1.2.3.4:1080", false);
assert_eq!(result, "socks5://1.2.3.4:1080");
}
#[test]
fn normalize_socks5_to_socks5h_when_prefer_remote() {
let result = ProxyManager::normalize_proxy_url("socks5://1.2.3.4:1080", true);
assert_eq!(result, "socks5h://1.2.3.4:1080");
}
#[test]
fn normalize_socks5_stays_when_not_prefer_remote() {
let result = ProxyManager::normalize_proxy_url("socks5://1.2.3.4:1080", false);
assert_eq!(result, "socks5://1.2.3.4:1080");
}
#[test]
fn normalize_socks5h_unchanged() {
let result = ProxyManager::normalize_proxy_url("socks5h://1.2.3.4:1080", true);
assert_eq!(result, "socks5h://1.2.3.4:1080");
}
#[test]
fn normalize_socks5h_unchanged_no_prefer() {
let result = ProxyManager::normalize_proxy_url("socks5h://1.2.3.4:1080", false);
assert_eq!(result, "socks5h://1.2.3.4:1080");
}
#[test]
fn normalize_empty_string() {
let result = ProxyManager::normalize_proxy_url("", true);
assert!(result.is_empty());
}
#[test]
fn normalize_whitespace_only() {
let result = ProxyManager::normalize_proxy_url(" ", true);
assert!(result.is_empty());
}
#[test]
fn normalize_comment_line() {
let result = ProxyManager::normalize_proxy_url("# this is a comment", true);
assert!(result.is_empty());
}
#[test]
fn normalize_trims_whitespace() {
let result = ProxyManager::normalize_proxy_url(" 1.2.3.4:1080 ", true);
assert_eq!(result, "socks5h://1.2.3.4:1080");
}
#[test]
fn normalize_http_proxy_passed_through() {
let result = ProxyManager::normalize_proxy_url("http://proxy.example.com:8080", true);
assert_eq!(result, "http://proxy.example.com:8080");
}
#[test]
fn normalize_https_proxy_passed_through() {
let result = ProxyManager::normalize_proxy_url("https://proxy.example.com:8080", false);
assert_eq!(result, "https://proxy.example.com:8080");
}
#[test]
fn normalize_socks5_with_auth() {
let result = ProxyManager::normalize_proxy_url("socks5://user:pass@1.2.3.4:1080", true);
assert_eq!(result, "socks5h://user:pass@1.2.3.4:1080");
}
#[test]
fn new_manager_is_empty() {
let pm = ProxyManager::new(Duration::from_secs(8));
assert_eq!(pm.total_count(), 0);
assert!(pm.all_proxy_urls().is_empty());
assert!(pm.get_active_proxies().is_empty());
}
#[test]
fn get_state_returns_unknown_for_missing_proxy() {
let pm = ProxyManager::new(Duration::from_secs(8));
assert_eq!(pm.get_state("socks5h://1.2.3.4:1080"), ProxyState::Unknown);
}
#[test]
fn set_and_get_state() {
let pm = ProxyManager::new(Duration::from_secs(8));
pm.proxies.insert(
"socks5h://1.2.3.4:1080".to_string(),
ProxyInfo {
state: ProxyState::Unknown,
},
);
assert_eq!(pm.get_state("socks5h://1.2.3.4:1080"), ProxyState::Unknown);
pm.set_state("socks5h://1.2.3.4:1080", ProxyState::Active);
assert_eq!(pm.get_state("socks5h://1.2.3.4:1080"), ProxyState::Active);
pm.set_state("socks5h://1.2.3.4:1080", ProxyState::Dead);
assert_eq!(pm.get_state("socks5h://1.2.3.4:1080"), ProxyState::Dead);
}
#[test]
fn set_state_on_missing_proxy_is_noop() {
let pm = ProxyManager::new(Duration::from_secs(8));
pm.set_state("socks5h://nonexistent:1080", ProxyState::Dead);
assert_eq!(pm.total_count(), 0);
}
#[test]
fn get_active_proxies_excludes_dead() {
let pm = ProxyManager::new(Duration::from_secs(8));
pm.proxies.insert(
"socks5h://alive:1080".to_string(),
ProxyInfo {
state: ProxyState::Active,
},
);
pm.proxies.insert(
"socks5h://unknown:1080".to_string(),
ProxyInfo {
state: ProxyState::Unknown,
},
);
pm.proxies.insert(
"socks5h://dead:1080".to_string(),
ProxyInfo {
state: ProxyState::Dead,
},
);
let active = pm.get_active_proxies();
assert_eq!(active.len(), 2);
assert!(active.contains(&"socks5h://alive:1080".to_string()));
assert!(active.contains(&"socks5h://unknown:1080".to_string()));
assert!(!active.contains(&"socks5h://dead:1080".to_string()));
}
#[test]
fn proxy_counts_correct() {
let pm = ProxyManager::new(Duration::from_secs(8));
pm.proxies.insert(
"a".to_string(),
ProxyInfo {
state: ProxyState::Active,
},
);
pm.proxies.insert(
"b".to_string(),
ProxyInfo {
state: ProxyState::Unknown,
},
);
pm.proxies.insert(
"c".to_string(),
ProxyInfo {
state: ProxyState::Dead,
},
);
pm.proxies.insert(
"d".to_string(),
ProxyInfo {
state: ProxyState::Dead,
},
);
let (total, active_or_unknown, cooldown, dead) = pm.proxy_counts();
assert_eq!(total, 4);
assert_eq!(active_or_unknown, 2);
assert_eq!(cooldown, 0);
assert_eq!(dead, 2);
}
#[test]
fn proxy_counts_empty() {
let pm = ProxyManager::new(Duration::from_secs(8));
let (total, active, cooldown, dead) = pm.proxy_counts();
assert_eq!(total, 0);
assert_eq!(active, 0);
assert_eq!(cooldown, 0);
assert_eq!(dead, 0);
}
#[test]
fn get_client_creates_and_caches() {
let pm = ProxyManager::new(Duration::from_secs(8));
let client1 = pm.get_client("socks5h://127.0.0.1:1080").unwrap();
let client2 = pm.get_client("socks5h://127.0.0.1:1080").unwrap();
assert_eq!(pm.clients.len(), 1);
drop(client1);
drop(client2);
}
#[test]
fn get_client_different_proxies_get_different_clients() {
let pm = ProxyManager::new(Duration::from_secs(8));
let _c1 = pm.get_client("socks5h://127.0.0.1:1080").unwrap();
let _c2 = pm.get_client("socks5h://127.0.0.1:1081").unwrap();
assert_eq!(pm.clients.len(), 2);
}
#[test]
fn get_client_invalid_url_returns_error() {
let pm = ProxyManager::new(Duration::from_secs(8));
let result = pm.get_client("not a valid url at all");
if let Err(e) = result {
match e {
ScatterProxyError::Init(msg) => {
assert!(msg.contains("proxy") || msg.contains("invalid"));
}
other => panic!("expected Init error, got: {other:?}"),
}
}
}
#[test]
fn all_proxy_urls_returns_all() {
let pm = ProxyManager::new(Duration::from_secs(8));
pm.proxies.insert(
"a".to_string(),
ProxyInfo {
state: ProxyState::Active,
},
);
pm.proxies.insert(
"b".to_string(),
ProxyInfo {
state: ProxyState::Dead,
},
);
let urls = pm.all_proxy_urls();
assert_eq!(urls.len(), 2);
assert!(urls.contains(&"a".to_string()));
assert!(urls.contains(&"b".to_string()));
}
#[test]
fn total_count_tracks_insertions() {
let pm = ProxyManager::new(Duration::from_secs(8));
assert_eq!(pm.total_count(), 0);
pm.proxies.insert(
"x".to_string(),
ProxyInfo {
state: ProxyState::Unknown,
},
);
assert_eq!(pm.total_count(), 1);
pm.proxies.insert(
"y".to_string(),
ProxyInfo {
state: ProxyState::Unknown,
},
);
assert_eq!(pm.total_count(), 2);
}
#[test]
fn proxy_state_debug() {
assert_eq!(format!("{:?}", ProxyState::Unknown), "Unknown");
assert_eq!(format!("{:?}", ProxyState::Active), "Active");
assert_eq!(format!("{:?}", ProxyState::Dead), "Dead");
}
#[test]
fn proxy_state_clone_and_eq() {
let s = ProxyState::Active;
let s2 = s;
assert_eq!(s, s2);
}
#[tokio::test]
async fn fetch_and_add_with_empty_sources() {
let pm = ProxyManager::new(Duration::from_secs(8));
let added = pm.fetch_and_add(&[], true).await.unwrap();
assert_eq!(added, 0);
assert_eq!(pm.total_count(), 0);
}
#[tokio::test]
async fn fetch_and_add_with_unreachable_source_logs_error() {
let pm = ProxyManager::new(Duration::from_secs(8));
let added = pm
.fetch_and_add(
&["http://127.0.0.1:1/nonexistent-proxy-list".to_string()],
true,
)
.await
.unwrap();
assert_eq!(added, 0);
assert_eq!(pm.total_count(), 0);
}
#[test]
fn normalize_comment_with_leading_whitespace() {
let result = ProxyManager::normalize_proxy_url(" # comment", true);
assert!(result.is_empty());
}
#[test]
fn normalize_ipv6_bare() {
let result = ProxyManager::normalize_proxy_url("[::1]:1080", true);
assert_eq!(result, "socks5h://[::1]:1080");
}
#[test]
fn normalize_socks5_uppercase_not_matched_becomes_bare() {
let result = ProxyManager::normalize_proxy_url("SOCKS5://1.2.3.4:1080", true);
assert_eq!(result, "socks5h://SOCKS5://1.2.3.4:1080");
}
}