mod classifier;
mod config;
mod error;
mod health;
mod metrics;
mod persist;
mod proxy;
mod rate_limit;
mod router;
mod scheduler;
mod score;
mod task;
pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
pub use error::ScatterProxyError;
pub use metrics::{PoolMetrics, ProxyHostStats};
pub use router::ScatterProxyRouter;
pub use task::{ScatterResponse, TaskHandle};
pub use http::HeaderMap;
pub use http::StatusCode;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Semaphore};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
#[allow(dead_code)]
pub struct ScatterProxy {
config: Arc<ScatterProxyConfig>,
task_pool: Arc<task::TaskPool>,
health: Arc<health::HealthTracker>,
rate_limiter: Arc<rate_limit::RateLimiter>,
proxy_manager: Arc<proxy::ProxyManager>,
throughput: Arc<metrics::ThroughputTracker>,
semaphore: Arc<Semaphore>,
shutdown_tx: broadcast::Sender<()>,
_scheduler_handle: JoinHandle<()>,
_persist_handle: Option<JoinHandle<()>>,
_metrics_handle: JoinHandle<()>,
_refresh_handle: JoinHandle<()>,
}
impl ScatterProxy {
pub async fn new(
config: ScatterProxyConfig,
classifier: impl BodyClassifier,
) -> Result<Self, ScatterProxyError> {
Self::new_arc(config, Arc::new(classifier)).await
}
pub(crate) async fn new_arc(
config: ScatterProxyConfig,
classifier: Arc<dyn BodyClassifier>,
) -> Result<Self, ScatterProxyError> {
let metrics_name = config.name.clone();
let config = Arc::new(config);
let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
let health = Arc::new(health::HealthTracker::new(config.health_window));
let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
let throughput = Arc::new(metrics::ThroughputTracker::new());
let semaphore = Arc::new(Semaphore::new(config.max_inflight));
let effective_sources: Vec<String> = if config.sources.is_empty() {
info!("no custom sources configured, using default free proxy sources");
DEFAULT_PROXY_SOURCES
.iter()
.map(|s| s.to_string())
.collect()
} else {
config.sources.clone()
};
let proxy_count = proxy_manager
.fetch_and_add(&effective_sources, config.prefer_remote_dns)
.await?;
if let Some(ref state_path) = config.state_file {
match persist::load_state(state_path).await {
Ok(Some(persisted)) => {
for (proxy_url, persisted_proxy) in &persisted.proxies {
for (host, stats) in &persisted_proxy.hosts {
health.restore(proxy_url, host, stats);
}
match persisted_proxy.state.as_str() {
"Active" => {
proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
}
"Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
_ => {} }
}
debug!(
proxies = persisted.proxies.len(),
"restored persisted state"
);
}
Ok(None) => {
debug!("no persisted state file found, starting fresh");
}
Err(e) => {
warn!(error = %e, "failed to load persisted state, starting fresh");
}
}
}
info!(count = proxy_count, "loaded proxies from sources");
let (shutdown_tx, _) = broadcast::channel(1);
let sched = scheduler::Scheduler::new(
Arc::clone(&config),
Arc::clone(&task_pool),
Arc::clone(&health),
Arc::clone(&rate_limiter),
Arc::clone(&proxy_manager),
Arc::clone(&classifier),
Arc::clone(&semaphore),
Arc::clone(&throughput),
);
let shutdown_rx_sched = shutdown_tx.subscribe();
let _scheduler_handle = tokio::spawn(async move {
sched.run(shutdown_rx_sched).await;
});
let _persist_handle = if config.state_file.is_some() {
let persist_config = Arc::clone(&config);
let persist_health = Arc::clone(&health);
let persist_proxy_mgr = Arc::clone(&proxy_manager);
let mut shutdown_rx_persist = shutdown_tx.subscribe();
let interval = config.state_save_interval;
Some(tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx_persist.recv() => {
debug!("persist task received shutdown signal");
break;
}
_ = tokio::time::sleep(interval) => {
if let Some(ref path) = persist_config.state_file {
let health_stats = persist_health.get_all_stats();
let proxy_states = build_proxy_states(&persist_proxy_mgr);
if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
warn!(error = %e, "failed to save state");
} else {
debug!("persisted state to disk");
}
}
}
}
}
}))
} else {
None
};
let _metrics_config = Arc::clone(&config);
let metrics_task_pool = Arc::clone(&task_pool);
let metrics_health = Arc::clone(&health);
let metrics_proxy_mgr = Arc::clone(&proxy_manager);
let metrics_throughput = Arc::clone(&throughput);
let metrics_semaphore = Arc::clone(&semaphore);
let mut shutdown_rx_metrics = shutdown_tx.subscribe();
let metrics_interval = config.metrics_log_interval;
let max_inflight = config.max_inflight;
let _metrics_handle = tokio::spawn(async move {
let prefix = metrics_name
.as_deref()
.map_or_else(String::new, |n| format!("[{n}] "));
loop {
tokio::select! {
_ = shutdown_rx_metrics.recv() => {
debug!("metrics task received shutdown signal");
break;
}
_ = tokio::time::sleep(metrics_interval) => {
let tp = metrics_throughput.throughput(Duration::from_secs(10));
let total_s = metrics_health.total_success();
let total_f = metrics_health.total_fail();
let total = total_s + total_f;
let success_pct = if total > 0 {
(total_s as f64 / total as f64) * 100.0
} else {
0.0
};
let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
let pending = metrics_task_pool.pending_count();
let delayed = metrics_task_pool.delayed_count();
let done = metrics_task_pool.completed_count();
let failed = metrics_task_pool.failed_count();
let requeued = metrics_task_pool.requeued_count();
let zero_available = metrics_task_pool.zero_available_count();
let skipped_no_permit = metrics_task_pool.skipped_no_permit_count();
let skipped_rate_limit = metrics_task_pool.skipped_rate_limit_count();
let skipped_cooldown = metrics_task_pool.skipped_cooldown_count();
let dispatches = metrics_task_pool.dispatch_count();
let inflight = max_inflight - metrics_semaphore.available_permits();
info!(
"{prefix}throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} delayed {} done {} failed requeued={} zero_avail={} dispatch={} skip(no_permit/rate/cooldown)={}/{}/{} | inflight={}",
tp,
success_pct,
healthy,
cooldown,
dead,
pending,
delayed,
done,
failed,
requeued,
zero_available,
dispatches,
skipped_no_permit,
skipped_rate_limit,
skipped_cooldown,
inflight
);
}
}
}
});
let effective_sources = Arc::new(effective_sources);
let refresh_sources = Arc::clone(&effective_sources);
let refresh_config = Arc::clone(&config);
let refresh_proxy_mgr = Arc::clone(&proxy_manager);
let mut shutdown_rx_refresh = shutdown_tx.subscribe();
let refresh_interval = config.source_refresh_interval;
let _refresh_handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx_refresh.recv() => {
debug!("refresh task received shutdown signal");
break;
}
_ = tokio::time::sleep(refresh_interval) => {
match refresh_proxy_mgr
.fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
.await
{
Ok(count) => {
debug!(new_count = count, "refreshed proxy sources");
}
Err(e) => {
warn!(error = %e, "failed to refresh proxy sources");
}
}
}
}
}
});
Ok(ScatterProxy {
config,
task_pool,
health,
rate_limiter,
proxy_manager,
throughput,
semaphore,
shutdown_tx,
_scheduler_handle,
_persist_handle,
_metrics_handle,
_refresh_handle,
})
}
pub async fn submit(&self, request: reqwest::Request) -> TaskHandle {
self.task_pool.submit(request).await
}
pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
self.task_pool.try_submit(request)
}
pub async fn submit_batch(&self, requests: Vec<reqwest::Request>) -> Vec<TaskHandle> {
self.task_pool.submit_batch(requests).await
}
pub fn try_submit_batch(
&self,
requests: Vec<reqwest::Request>,
) -> Result<Vec<TaskHandle>, ScatterProxyError> {
self.task_pool.try_submit_batch(requests)
}
pub async fn submit_timeout(
&self,
request: reqwest::Request,
timeout: Duration,
) -> Result<TaskHandle, ScatterProxyError> {
self.task_pool.submit_timeout(request, timeout).await
}
pub fn metrics(&self) -> PoolMetrics {
let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
let total_s = self.health.total_success();
let total_f = self.health.total_fail();
let total_requests = total_s + total_f;
let success_rate = if total_requests > 0 {
total_s as f64 / total_requests as f64
} else {
0.0
};
PoolMetrics {
total_proxies: total,
healthy_proxies: healthy,
cooldown_proxies: cooldown,
dead_proxies: dead,
pending_tasks: self.task_pool.pending_count(),
delayed_tasks: self.task_pool.delayed_count(),
completed_tasks: self.task_pool.completed_count(),
failed_tasks: self.task_pool.failed_count(),
throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
success_rate_1m: success_rate,
avg_latency_ms: self.health.avg_latency_ms(),
inflight: self.config.max_inflight - self.semaphore.available_permits(),
requeued_tasks: self.task_pool.requeued_count(),
zero_available_events: self.task_pool.zero_available_count(),
skipped_no_permit: self.task_pool.skipped_no_permit_count(),
skipped_rate_limit: self.task_pool.skipped_rate_limit_count(),
skipped_cooldown: self.task_pool.skipped_cooldown_count(),
dispatch_count: self.task_pool.dispatch_count(),
}
}
pub async fn shutdown(self) {
info!("initiating scatter-proxy shutdown");
let _ = self.shutdown_tx.send(());
if let Some(ref path) = self.config.state_file {
let health_stats = self.health.get_all_stats();
let proxy_states = build_proxy_states(&self.proxy_manager);
if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
warn!(error = %e, "failed to save final state during shutdown");
} else {
debug!("saved final state to disk");
}
}
info!("scatter-proxy shutdown complete");
}
}
fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
let urls = proxy_manager.all_proxy_urls();
let mut states = HashMap::with_capacity(urls.len());
for url in urls {
let state = proxy_manager.get_state(&url);
let label = match state {
proxy::ProxyState::Active => "Active",
proxy::ProxyState::Dead => "Dead",
proxy::ProxyState::Unknown => "Unknown",
};
states.insert(url, label.to_string());
}
states
}