use std::time::Duration;
use scatter_proxy::{
BodyClassifier, BodyVerdict, DefaultClassifier, HeaderMap, RateLimitConfig, ScatterProxy,
ScatterProxyConfig, ScatterProxyError, StatusCode,
};
#[tokio::test]
async fn new_with_no_sources_succeeds() {
let config = ScatterProxyConfig::default();
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let m = pool.metrics();
assert_eq!(m.total_proxies, 0);
pool.shutdown().await;
}
#[tokio::test]
async fn metrics_starts_at_zero() {
let config = ScatterProxyConfig::default();
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let m = pool.metrics();
assert_eq!(m.total_proxies, 0);
assert_eq!(m.healthy_proxies, 0);
assert_eq!(m.cooldown_proxies, 0);
assert_eq!(m.dead_proxies, 0);
assert_eq!(m.pending_tasks, 0);
assert_eq!(m.completed_tasks, 0);
assert_eq!(m.failed_tasks, 0);
assert_eq!(m.inflight, 0);
assert!(m.circuit_breakers.is_empty());
assert!(m.throughput_1s.abs() < f64::EPSILON);
assert!(m.throughput_10s.abs() < f64::EPSILON);
assert!(m.throughput_60s.abs() < f64::EPSILON);
pool.shutdown().await;
}
#[tokio::test]
async fn submit_returns_pool_full_when_capacity_exhausted() {
let config = ScatterProxyConfig {
task_pool_capacity: 2,
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let client = reqwest::Client::new();
let _h1 = pool
.submit(client.get("http://example.com/1").build().unwrap())
.unwrap();
let _h2 = pool
.submit(client.get("http://example.com/2").build().unwrap())
.unwrap();
let result = pool.submit(client.get("http://example.com/3").build().unwrap());
assert!(
matches!(result, Err(ScatterProxyError::PoolFull { capacity: 2 })),
"expected PoolFull {{ capacity: 2 }}, got: {result:?}"
);
pool.shutdown().await;
}
#[tokio::test]
async fn submit_batch_adds_all_tasks() {
let config = ScatterProxyConfig {
task_pool_capacity: 100,
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let client = reqwest::Client::new();
let requests: Vec<_> = (0..5)
.map(|i| {
client
.get(format!("http://example.com/{i}"))
.build()
.unwrap()
})
.collect();
let handles = pool.submit_batch(requests).unwrap();
assert_eq!(handles.len(), 5);
assert_eq!(pool.metrics().pending_tasks, 5);
pool.shutdown().await;
}
#[tokio::test]
async fn submit_batch_rejects_when_insufficient_capacity() {
let config = ScatterProxyConfig {
task_pool_capacity: 3,
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let client = reqwest::Client::new();
let requests: Vec<_> = (0..5)
.map(|i| {
client
.get(format!("http://example.com/{i}"))
.build()
.unwrap()
})
.collect();
let result = pool.submit_batch(requests);
assert!(
matches!(result, Err(ScatterProxyError::PoolFull { .. })),
"expected PoolFull, got: {result:?}"
);
assert_eq!(pool.metrics().pending_tasks, 0);
pool.shutdown().await;
}
#[tokio::test]
async fn submit_batch_empty_is_ok() {
let config = ScatterProxyConfig::default();
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let handles = pool.submit_batch(vec![]).unwrap();
assert!(handles.is_empty());
assert_eq!(pool.metrics().pending_tasks, 0);
pool.shutdown().await;
}
#[tokio::test]
async fn shutdown_is_idempotent_like() {
let config = ScatterProxyConfig::default();
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
pool.shutdown().await;
}
#[tokio::test]
async fn state_persistence_round_trip() {
let tmp = std::env::temp_dir().join("scatter_proxy_test_state.json");
let _ = std::fs::remove_file(&tmp);
let config = ScatterProxyConfig {
state_file: Some(tmp.clone()),
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
pool.shutdown().await;
assert!(
tmp.exists(),
"state file should be created on shutdown at {tmp:?}"
);
let config2 = ScatterProxyConfig {
state_file: Some(tmp.clone()),
..Default::default()
};
let pool2 = ScatterProxy::new(config2, DefaultClassifier).await.unwrap();
pool2.shutdown().await;
let _ = std::fs::remove_file(&tmp);
}
#[tokio::test]
async fn config_overrides_propagate() {
let config = ScatterProxyConfig {
proxy_timeout: Duration::from_secs(20),
task_timeout: Duration::from_secs(120),
max_concurrent_per_request: 10,
max_inflight: 50,
max_attempts: 15,
task_pool_capacity: 500,
prefer_remote_dns: false,
rate_limit: RateLimitConfig {
default_interval: Duration::from_millis(100),
host_overrides: {
let mut m = std::collections::HashMap::new();
m.insert("slow.example.com".into(), Duration::from_secs(2));
m
},
},
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let m = pool.metrics();
assert_eq!(m.total_proxies, 0);
pool.shutdown().await;
}
#[tokio::test]
async fn custom_classifier_is_accepted() {
struct AlwaysSuccess;
impl BodyClassifier for AlwaysSuccess {
fn classify(&self, _status: StatusCode, _headers: &HeaderMap, _body: &[u8]) -> BodyVerdict {
BodyVerdict::Success
}
}
let config = ScatterProxyConfig::default();
let pool = ScatterProxy::new(config, AlwaysSuccess).await.unwrap();
assert_eq!(pool.metrics().total_proxies, 0);
pool.shutdown().await;
}
#[tokio::test]
async fn metrics_reflects_submitted_tasks() {
let config = ScatterProxyConfig {
task_pool_capacity: 50,
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let client = reqwest::Client::new();
for i in 0..10 {
pool.submit(
client
.get(format!("http://example.com/{i}"))
.build()
.unwrap(),
)
.unwrap();
}
let m = pool.metrics();
assert_eq!(m.pending_tasks, 10);
assert_eq!(m.completed_tasks, 0);
assert_eq!(m.failed_tasks, 0);
pool.shutdown().await;
}
#[tokio::test]
async fn pool_metrics_is_clone_and_debug() {
let config = ScatterProxyConfig::default();
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let m = pool.metrics();
let m2 = m.clone();
let debug_str = format!("{m2:?}");
assert!(debug_str.contains("PoolMetrics"));
assert_eq!(m.total_proxies, m2.total_proxies);
pool.shutdown().await;
}
const PROXY_SOURCES: &[&str] = &[
"https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt",
"https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks5.txt",
];
fn should_run_integration() -> bool {
std::env::var("SCATTER_INTEGRATION").is_ok_and(|v| v == "1" || v == "true")
}
#[tokio::test]
#[ignore]
async fn real_proxy_fetch_httpbin() {
if !should_run_integration() {
return;
}
let _ = tracing_subscriber::fmt()
.with_env_filter("scatter_proxy=debug")
.try_init();
let config = ScatterProxyConfig {
sources: PROXY_SOURCES.iter().map(|s| s.to_string()).collect(),
max_concurrent_per_request: 5,
proxy_timeout: Duration::from_secs(15),
task_timeout: Duration::from_secs(45),
max_attempts: 8,
rate_limit: RateLimitConfig {
default_interval: Duration::from_millis(200),
..Default::default()
},
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let m = pool.metrics();
assert!(
m.total_proxies > 0,
"should have fetched proxies from sources"
);
println!("Fetched {} proxies", m.total_proxies);
let client = reqwest::Client::new();
let req = client.get("http://httpbin.org/ip").build().unwrap();
let handle = pool.submit(req).unwrap();
match handle.await {
Ok(resp) => {
println!(
"Success! Status: {}, Body: {}",
resp.status,
String::from_utf8_lossy(&resp.body)
);
assert!(resp.status.is_success());
}
Err(e) => {
println!("Request failed (expected with free proxies): {e}");
}
}
let m = pool.metrics();
println!(
"Final metrics: throughput={:.1}/s success={:.0}%",
m.throughput_10s,
m.success_rate_1m * 100.0
);
pool.shutdown().await;
}
#[tokio::test]
#[ignore]
async fn real_proxy_batch_requests() {
if !should_run_integration() {
return;
}
let _ = tracing_subscriber::fmt()
.with_env_filter("scatter_proxy=info")
.try_init();
let config = ScatterProxyConfig {
sources: PROXY_SOURCES.iter().map(|s| s.to_string()).collect(),
max_concurrent_per_request: 3,
proxy_timeout: Duration::from_secs(10),
task_timeout: Duration::from_secs(30),
max_attempts: 5,
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let client = reqwest::Client::new();
let urls = [
"http://httpbin.org/ip",
"http://httpbin.org/user-agent",
"http://httpbin.org/headers",
];
let requests: Vec<_> = urls
.iter()
.map(|u| client.get(*u).build().unwrap())
.collect();
let handles = pool.submit_batch(requests).unwrap();
let mut successes = 0;
for (url, handle) in urls.iter().zip(handles) {
match handle.await {
Ok(resp) => {
println!("{url}: {} ({} bytes)", resp.status, resp.body.len());
successes += 1;
}
Err(e) => println!("{url}: {e}"),
}
}
println!("{successes}/{} requests succeeded", urls.len());
pool.shutdown().await;
}
#[tokio::test]
#[ignore]
async fn real_proxy_custom_classifier() {
if !should_run_integration() {
return;
}
let _ = tracing_subscriber::fmt()
.with_env_filter("scatter_proxy=debug")
.try_init();
struct JsonClassifier;
impl BodyClassifier for JsonClassifier {
fn classify(&self, status: StatusCode, _headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
if !status.is_success() {
return if status.is_server_error() {
BodyVerdict::TargetError
} else {
BodyVerdict::ProxyBlocked
};
}
match serde_json::from_slice::<serde_json::Value>(body) {
Ok(_) => BodyVerdict::Success,
Err(_) => BodyVerdict::ProxyBlocked,
}
}
}
let config = ScatterProxyConfig {
sources: PROXY_SOURCES.iter().map(|s| s.to_string()).collect(),
proxy_timeout: Duration::from_secs(10),
task_timeout: Duration::from_secs(30),
..Default::default()
};
let pool = ScatterProxy::new(config, JsonClassifier).await.unwrap();
let client = reqwest::Client::new();
let req = client.get("http://httpbin.org/json").build().unwrap();
let handle = pool.submit(req).unwrap();
match handle.await {
Ok(resp) => {
let value: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
println!("Got valid JSON: {value}");
}
Err(e) => println!("Failed (expected with free proxies): {e}"),
}
pool.shutdown().await;
}
#[tokio::test]
#[ignore]
async fn real_proxy_metrics_after_load() {
if !should_run_integration() {
return;
}
let _ = tracing_subscriber::fmt()
.with_env_filter("scatter_proxy=info")
.try_init();
let config = ScatterProxyConfig {
sources: PROXY_SOURCES.iter().map(|s| s.to_string()).collect(),
max_concurrent_per_request: 5,
proxy_timeout: Duration::from_secs(8),
task_timeout: Duration::from_secs(20),
max_attempts: 3,
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let client = reqwest::Client::new();
let mut handles = Vec::new();
for i in 0..5 {
let req = client
.get(format!("http://httpbin.org/anything/{i}"))
.build()
.unwrap();
handles.push(pool.submit(req).unwrap());
}
for h in handles {
let _ = h.await;
}
let m = pool.metrics();
println!("Metrics after load:");
println!(" total_proxies: {}", m.total_proxies);
println!(
" healthy: {}, cooldown: {}, dead: {}",
m.healthy_proxies, m.cooldown_proxies, m.dead_proxies
);
println!(
" completed: {}, failed: {}",
m.completed_tasks, m.failed_tasks
);
println!(" throughput: {:.1}/s", m.throughput_10s);
println!(" success_rate: {:.0}%", m.success_rate_1m * 100.0);
println!(" avg_latency: {:.0}ms", m.avg_latency_ms);
assert!(
m.completed_tasks + m.failed_tasks > 0,
"some tasks should have resolved"
);
pool.shutdown().await;
}
#[tokio::test]
#[ignore]
async fn real_proxy_state_persistence() {
if !should_run_integration() {
return;
}
let _ = tracing_subscriber::fmt()
.with_env_filter("scatter_proxy=debug")
.try_init();
let tmp = std::env::temp_dir().join("scatter_proxy_integration_state.json");
let _ = std::fs::remove_file(&tmp);
let config = ScatterProxyConfig {
sources: PROXY_SOURCES.iter().map(|s| s.to_string()).collect(),
state_file: Some(tmp.clone()),
..Default::default()
};
let pool = ScatterProxy::new(config, DefaultClassifier).await.unwrap();
let count = pool.metrics().total_proxies;
println!("Phase 1: fetched {count} proxies, shutting down…");
assert!(count > 0, "should have fetched proxies");
pool.shutdown().await;
assert!(tmp.exists(), "state file should exist after shutdown");
let file_size = std::fs::metadata(&tmp).unwrap().len();
println!("State file size: {file_size} bytes");
assert!(file_size > 2, "state file should contain meaningful data");
let config2 = ScatterProxyConfig {
sources: PROXY_SOURCES.iter().map(|s| s.to_string()).collect(),
state_file: Some(tmp.clone()),
..Default::default()
};
let pool2 = ScatterProxy::new(config2, DefaultClassifier).await.unwrap();
let count2 = pool2.metrics().total_proxies;
println!("Phase 2: loaded {count2} proxies from persisted state + sources");
assert!(count2 > 0, "should have proxies after reload");
pool2.shutdown().await;
let _ = std::fs::remove_file(&tmp);
}