scatter-proxy 0.2.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
//! scatter-proxy — async request scheduler for unreliable SOCKS5 proxies.
//!
//! Multi-path race for maximum throughput.

mod circuit_breaker;
mod classifier;
mod config;
mod error;
mod health;
mod metrics;
mod persist;
mod proxy;
mod rate_limit;
mod scheduler;
mod score;
mod task;

// Re-exports (public API surface)
pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
pub use error::ScatterProxyError;
pub use metrics::{PoolMetrics, ProxyHostStats};
pub use task::{ScatterResponse, TaskHandle};

// Also re-export useful types for the classifier trait
pub use http::HeaderMap;
pub use http::StatusCode;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Semaphore};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};

/// The main entry point. Manages proxy pool, scheduling, and background tasks.
#[allow(dead_code)]
pub struct ScatterProxy {
    config: Arc<ScatterProxyConfig>,
    task_pool: Arc<task::TaskPool>,
    health: Arc<health::HealthTracker>,
    rate_limiter: Arc<rate_limit::RateLimiter>,
    circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
    proxy_manager: Arc<proxy::ProxyManager>,
    throughput: Arc<metrics::ThroughputTracker>,
    semaphore: Arc<Semaphore>,
    shutdown_tx: broadcast::Sender<()>,
    // Background task handles
    _scheduler_handle: JoinHandle<()>,
    _persist_handle: Option<JoinHandle<()>>,
    _metrics_handle: JoinHandle<()>,
    _refresh_handle: JoinHandle<()>,
}

impl ScatterProxy {
    /// Create a new `ScatterProxy` from config and a body classifier.
    ///
    /// This is an async constructor that fetches proxies, restores persisted
    /// state, and spawns all background tasks (scheduler, persistence, metrics
    /// logging, source refresh).
    pub async fn new(
        config: ScatterProxyConfig,
        classifier: impl BodyClassifier,
    ) -> Result<Self, ScatterProxyError> {
        let config = Arc::new(config);

        // Build all Arc-wrapped components from config.
        let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
        let health = Arc::new(health::HealthTracker::new(config.health_window));
        let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
        let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
            config.circuit_breaker_threshold,
            config.circuit_breaker_probe_interval,
        ));
        let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
        let throughput = Arc::new(metrics::ThroughputTracker::new());
        let semaphore = Arc::new(Semaphore::new(config.max_inflight));
        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);

        // Determine sources: use configured sources, or fall back to defaults.
        let effective_sources: Vec<String> = if config.sources.is_empty() {
            info!("no custom sources configured, using default free proxy sources");
            DEFAULT_PROXY_SOURCES
                .iter()
                .map(|s| s.to_string())
                .collect()
        } else {
            config.sources.clone()
        };

        // Fetch proxies from sources.
        let proxy_count = proxy_manager
            .fetch_and_add(&effective_sources, config.prefer_remote_dns)
            .await?;

        // Restore persisted state if configured.
        if let Some(ref state_path) = config.state_file {
            match persist::load_state(state_path).await {
                Ok(Some(persisted)) => {
                    for (proxy_url, persisted_proxy) in &persisted.proxies {
                        // Restore per-host health data.
                        for (host, stats) in &persisted_proxy.hosts {
                            health.restore(proxy_url, host, stats);
                        }
                        // Restore proxy state.
                        match persisted_proxy.state.as_str() {
                            "Active" => {
                                proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
                            }
                            "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
                            _ => {} // Unknown or unrecognised — leave as default
                        }
                    }
                    debug!(
                        proxies = persisted.proxies.len(),
                        "restored persisted state"
                    );
                }
                Ok(None) => {
                    debug!("no persisted state file found, starting fresh");
                }
                Err(e) => {
                    warn!(error = %e, "failed to load persisted state, starting fresh");
                }
            }
        }

        info!(count = proxy_count, "loaded proxies from sources");

        // Shutdown broadcast channel.
        let (shutdown_tx, _) = broadcast::channel(1);

        // ── Spawn background tasks ──────────────────────────────────────

        // (a) Scheduler
        let sched = scheduler::Scheduler::new(
            Arc::clone(&config),
            Arc::clone(&task_pool),
            Arc::clone(&health),
            Arc::clone(&rate_limiter),
            Arc::clone(&circuit_breakers),
            Arc::clone(&proxy_manager),
            Arc::clone(&classifier),
            Arc::clone(&semaphore),
            Arc::clone(&throughput),
        );
        let shutdown_rx_sched = shutdown_tx.subscribe();
        let _scheduler_handle = tokio::spawn(async move {
            sched.run(shutdown_rx_sched).await;
        });

        // (b) State persistence (only if state_file is configured)
        let _persist_handle = if config.state_file.is_some() {
            let persist_config = Arc::clone(&config);
            let persist_health = Arc::clone(&health);
            let persist_proxy_mgr = Arc::clone(&proxy_manager);
            let mut shutdown_rx_persist = shutdown_tx.subscribe();
            let interval = config.state_save_interval;

            Some(tokio::spawn(async move {
                loop {
                    tokio::select! {
                        _ = shutdown_rx_persist.recv() => {
                            debug!("persist task received shutdown signal");
                            break;
                        }
                        _ = tokio::time::sleep(interval) => {
                            if let Some(ref path) = persist_config.state_file {
                                let health_stats = persist_health.get_all_stats();
                                let proxy_states = build_proxy_states(&persist_proxy_mgr);
                                if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
                                    warn!(error = %e, "failed to save state");
                                } else {
                                    debug!("persisted state to disk");
                                }
                            }
                        }
                    }
                }
            }))
        } else {
            None
        };

        // (c) Metrics logging
        let _metrics_config = Arc::clone(&config);
        let metrics_task_pool = Arc::clone(&task_pool);
        let metrics_health = Arc::clone(&health);
        let metrics_proxy_mgr = Arc::clone(&proxy_manager);
        let metrics_throughput = Arc::clone(&throughput);
        let metrics_semaphore = Arc::clone(&semaphore);
        let metrics_cb = Arc::clone(&circuit_breakers);
        let mut shutdown_rx_metrics = shutdown_tx.subscribe();
        let metrics_interval = config.metrics_log_interval;
        let max_inflight = config.max_inflight;

        let _metrics_handle = tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = shutdown_rx_metrics.recv() => {
                        debug!("metrics task received shutdown signal");
                        break;
                    }
                    _ = tokio::time::sleep(metrics_interval) => {
                        let tp = metrics_throughput.throughput(Duration::from_secs(10));
                        let total_s = metrics_health.total_success();
                        let total_f = metrics_health.total_fail();
                        let total = total_s + total_f;
                        let success_pct = if total > 0 {
                            (total_s as f64 / total as f64) * 100.0
                        } else {
                            0.0
                        };
                        let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
                        let pending = metrics_task_pool.pending_count();
                        let done = metrics_task_pool.completed_count();
                        let failed = metrics_task_pool.failed_count();
                        let inflight = max_inflight - metrics_semaphore.available_permits();
                        let breakers = metrics_cb.get_all();
                        let open_breakers: Vec<&String> = breakers.iter()
                            .filter(|(_, &open)| open)
                            .map(|(h, _)| h)
                            .collect();

                        info!(
                            "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
                            tp,
                            success_pct,
                            healthy,
                            cooldown,
                            dead,
                            pending,
                            done,
                            failed,
                            inflight,
                            if open_breakers.is_empty() {
                                "none".to_string()
                            } else {
                                format!("{:?}", open_breakers)
                            }
                        );
                    }
                }
            }
        });

        // (d) Source refresh
        let effective_sources = Arc::new(effective_sources);
        let refresh_sources = Arc::clone(&effective_sources);
        let refresh_config = Arc::clone(&config);
        let refresh_proxy_mgr = Arc::clone(&proxy_manager);
        let mut shutdown_rx_refresh = shutdown_tx.subscribe();
        let refresh_interval = config.source_refresh_interval;

        let _refresh_handle = tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = shutdown_rx_refresh.recv() => {
                        debug!("refresh task received shutdown signal");
                        break;
                    }
                    _ = tokio::time::sleep(refresh_interval) => {
                        match refresh_proxy_mgr
                            .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
                            .await
                        {
                            Ok(count) => {
                                debug!(new_count = count, "refreshed proxy sources");
                            }
                            Err(e) => {
                                warn!(error = %e, "failed to refresh proxy sources");
                            }
                        }
                    }
                }
            }
        });

        Ok(ScatterProxy {
            config,
            task_pool,
            health,
            rate_limiter,
            circuit_breakers,
            proxy_manager,
            throughput,
            semaphore,
            shutdown_tx,
            _scheduler_handle,
            _persist_handle,
            _metrics_handle,
            _refresh_handle,
        })
    }

    /// Submit a single request for proxied execution.
    ///
    /// Returns a [`TaskHandle`] that implements `Future` and resolves to the
    /// proxied response (or an error).
    pub fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
        self.task_pool
            .submit(request, self.config.max_attempts, self.config.task_timeout)
    }

    /// Submit a batch of requests for proxied execution.
    ///
    /// All requests are added atomically — if the pool doesn't have enough
    /// capacity the entire batch is rejected.
    pub fn submit_batch(
        &self,
        requests: Vec<reqwest::Request>,
    ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
        self.task_pool
            .submit_batch(requests, self.config.max_attempts, self.config.task_timeout)
    }

    /// Build a [`PoolMetrics`] snapshot from all internal components.
    pub fn metrics(&self) -> PoolMetrics {
        let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();

        let total_s = self.health.total_success();
        let total_f = self.health.total_fail();
        let total_requests = total_s + total_f;
        let success_rate = if total_requests > 0 {
            total_s as f64 / total_requests as f64
        } else {
            0.0
        };

        PoolMetrics {
            total_proxies: total,
            healthy_proxies: healthy,
            cooldown_proxies: cooldown,
            dead_proxies: dead,

            pending_tasks: self.task_pool.pending_count(),
            completed_tasks: self.task_pool.completed_count(),
            failed_tasks: self.task_pool.failed_count(),

            throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
            throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
            throughput_60s: self.throughput.throughput(Duration::from_secs(60)),

            success_rate_1m: success_rate,
            avg_latency_ms: self.health.avg_latency_ms(),

            inflight: self.config.max_inflight - self.semaphore.available_permits(),
            circuit_breakers: self.circuit_breakers.get_all(),
        }
    }

    /// Gracefully shut down the proxy: signal all background tasks, persist
    /// final state, and drop resources.
    pub async fn shutdown(self) {
        info!("initiating scatter-proxy shutdown");

        // Signal all background tasks to stop.
        let _ = self.shutdown_tx.send(());

        // Final state save if configured.
        if let Some(ref path) = self.config.state_file {
            let health_stats = self.health.get_all_stats();
            let proxy_states = build_proxy_states(&self.proxy_manager);
            if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
                warn!(error = %e, "failed to save final state during shutdown");
            } else {
                debug!("saved final state to disk");
            }
        }

        info!("scatter-proxy shutdown complete");
        // JoinHandles are dropped here, which detaches the background tasks
        // (they will exit via the shutdown signal).
    }
}

/// Build a map of proxy URL → state string for persistence.
fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
    let urls = proxy_manager.all_proxy_urls();
    let mut states = HashMap::with_capacity(urls.len());
    for url in urls {
        let state = proxy_manager.get_state(&url);
        let label = match state {
            proxy::ProxyState::Active => "Active",
            proxy::ProxyState::Dead => "Dead",
            proxy::ProxyState::Unknown => "Unknown",
        };
        states.insert(url, label.to_string());
    }
    states
}