mod circuit_breaker;
mod classifier;
mod config;
mod error;
mod health;
mod metrics;
mod persist;
mod proxy;
mod rate_limit;
mod scheduler;
mod score;
mod task;
pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
pub use config::{RateLimitConfig, ScatterProxyConfig};
pub use error::ScatterProxyError;
pub use metrics::{PoolMetrics, ProxyHostStats};
pub use task::{ScatterResponse, TaskHandle};
pub use http::HeaderMap;
pub use http::StatusCode;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Semaphore};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
#[allow(dead_code)]
pub struct ScatterProxy {
config: Arc<ScatterProxyConfig>,
task_pool: Arc<task::TaskPool>,
health: Arc<health::HealthTracker>,
rate_limiter: Arc<rate_limit::RateLimiter>,
circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
proxy_manager: Arc<proxy::ProxyManager>,
throughput: Arc<metrics::ThroughputTracker>,
semaphore: Arc<Semaphore>,
shutdown_tx: broadcast::Sender<()>,
_scheduler_handle: JoinHandle<()>,
_persist_handle: Option<JoinHandle<()>>,
_metrics_handle: JoinHandle<()>,
_refresh_handle: JoinHandle<()>,
}
impl ScatterProxy {
pub async fn new(
config: ScatterProxyConfig,
classifier: impl BodyClassifier,
) -> Result<Self, ScatterProxyError> {
let config = Arc::new(config);
let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
let health = Arc::new(health::HealthTracker::new(config.health_window));
let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
config.circuit_breaker_threshold,
config.circuit_breaker_probe_interval,
));
let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
let throughput = Arc::new(metrics::ThroughputTracker::new());
let semaphore = Arc::new(Semaphore::new(config.max_inflight));
let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
let proxy_count = proxy_manager
.fetch_and_add(&config.sources, config.prefer_remote_dns)
.await?;
if let Some(ref state_path) = config.state_file {
match persist::load_state(state_path).await {
Ok(Some(persisted)) => {
for (proxy_url, persisted_proxy) in &persisted.proxies {
for (host, stats) in &persisted_proxy.hosts {
health.restore(proxy_url, host, stats);
}
match persisted_proxy.state.as_str() {
"Active" => {
proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
}
"Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
_ => {} }
}
debug!(
proxies = persisted.proxies.len(),
"restored persisted state"
);
}
Ok(None) => {
debug!("no persisted state file found, starting fresh");
}
Err(e) => {
warn!(error = %e, "failed to load persisted state, starting fresh");
}
}
}
info!(count = proxy_count, "loaded proxies from sources");
let (shutdown_tx, _) = broadcast::channel(1);
let sched = scheduler::Scheduler::new(
Arc::clone(&config),
Arc::clone(&task_pool),
Arc::clone(&health),
Arc::clone(&rate_limiter),
Arc::clone(&circuit_breakers),
Arc::clone(&proxy_manager),
Arc::clone(&classifier),
Arc::clone(&semaphore),
Arc::clone(&throughput),
);
let shutdown_rx_sched = shutdown_tx.subscribe();
let _scheduler_handle = tokio::spawn(async move {
sched.run(shutdown_rx_sched).await;
});
let _persist_handle = if config.state_file.is_some() {
let persist_config = Arc::clone(&config);
let persist_health = Arc::clone(&health);
let persist_proxy_mgr = Arc::clone(&proxy_manager);
let mut shutdown_rx_persist = shutdown_tx.subscribe();
let interval = config.state_save_interval;
Some(tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx_persist.recv() => {
debug!("persist task received shutdown signal");
break;
}
_ = tokio::time::sleep(interval) => {
if let Some(ref path) = persist_config.state_file {
let health_stats = persist_health.get_all_stats();
let proxy_states = build_proxy_states(&persist_proxy_mgr);
if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
warn!(error = %e, "failed to save state");
} else {
debug!("persisted state to disk");
}
}
}
}
}
}))
} else {
None
};
let _metrics_config = Arc::clone(&config);
let metrics_task_pool = Arc::clone(&task_pool);
let metrics_health = Arc::clone(&health);
let metrics_proxy_mgr = Arc::clone(&proxy_manager);
let metrics_throughput = Arc::clone(&throughput);
let metrics_semaphore = Arc::clone(&semaphore);
let metrics_cb = Arc::clone(&circuit_breakers);
let mut shutdown_rx_metrics = shutdown_tx.subscribe();
let metrics_interval = config.metrics_log_interval;
let max_inflight = config.max_inflight;
let _metrics_handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx_metrics.recv() => {
debug!("metrics task received shutdown signal");
break;
}
_ = tokio::time::sleep(metrics_interval) => {
let tp = metrics_throughput.throughput(Duration::from_secs(10));
let total_s = metrics_health.total_success();
let total_f = metrics_health.total_fail();
let total = total_s + total_f;
let success_pct = if total > 0 {
(total_s as f64 / total as f64) * 100.0
} else {
0.0
};
let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
let pending = metrics_task_pool.pending_count();
let done = metrics_task_pool.completed_count();
let failed = metrics_task_pool.failed_count();
let inflight = max_inflight - metrics_semaphore.available_permits();
let breakers = metrics_cb.get_all();
let open_breakers: Vec<&String> = breakers.iter()
.filter(|(_, &open)| open)
.map(|(h, _)| h)
.collect();
info!(
"throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
tp,
success_pct,
healthy,
cooldown,
dead,
pending,
done,
failed,
inflight,
if open_breakers.is_empty() {
"none".to_string()
} else {
format!("{:?}", open_breakers)
}
);
}
}
}
});
let refresh_config = Arc::clone(&config);
let refresh_proxy_mgr = Arc::clone(&proxy_manager);
let mut shutdown_rx_refresh = shutdown_tx.subscribe();
let refresh_interval = config.source_refresh_interval;
let _refresh_handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx_refresh.recv() => {
debug!("refresh task received shutdown signal");
break;
}
_ = tokio::time::sleep(refresh_interval) => {
match refresh_proxy_mgr
.fetch_and_add(&refresh_config.sources, refresh_config.prefer_remote_dns)
.await
{
Ok(count) => {
debug!(new_count = count, "refreshed proxy sources");
}
Err(e) => {
warn!(error = %e, "failed to refresh proxy sources");
}
}
}
}
}
});
Ok(ScatterProxy {
config,
task_pool,
health,
rate_limiter,
circuit_breakers,
proxy_manager,
throughput,
semaphore,
shutdown_tx,
_scheduler_handle,
_persist_handle,
_metrics_handle,
_refresh_handle,
})
}
pub fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
self.task_pool
.submit(request, self.config.max_attempts, self.config.task_timeout)
}
pub fn submit_batch(
&self,
requests: Vec<reqwest::Request>,
) -> Result<Vec<TaskHandle>, ScatterProxyError> {
self.task_pool
.submit_batch(requests, self.config.max_attempts, self.config.task_timeout)
}
pub fn metrics(&self) -> PoolMetrics {
let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
let total_s = self.health.total_success();
let total_f = self.health.total_fail();
let total_requests = total_s + total_f;
let success_rate = if total_requests > 0 {
total_s as f64 / total_requests as f64
} else {
0.0
};
PoolMetrics {
total_proxies: total,
healthy_proxies: healthy,
cooldown_proxies: cooldown,
dead_proxies: dead,
pending_tasks: self.task_pool.pending_count(),
completed_tasks: self.task_pool.completed_count(),
failed_tasks: self.task_pool.failed_count(),
throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
success_rate_1m: success_rate,
avg_latency_ms: self.health.avg_latency_ms(),
inflight: self.config.max_inflight - self.semaphore.available_permits(),
circuit_breakers: self.circuit_breakers.get_all(),
}
}
pub async fn shutdown(self) {
info!("initiating scatter-proxy shutdown");
let _ = self.shutdown_tx.send(());
if let Some(ref path) = self.config.state_file {
let health_stats = self.health.get_all_stats();
let proxy_states = build_proxy_states(&self.proxy_manager);
if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
warn!(error = %e, "failed to save final state during shutdown");
} else {
debug!("saved final state to disk");
}
}
info!("scatter-proxy shutdown complete");
}
}
fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
let urls = proxy_manager.all_proxy_urls();
let mut states = HashMap::with_capacity(urls.len());
for url in urls {
let state = proxy_manager.get_state(&url);
let label = match state {
proxy::ProxyState::Active => "Active",
proxy::ProxyState::Dead => "Dead",
proxy::ProxyState::Unknown => "Unknown",
};
states.insert(url, label.to_string());
}
states
}