scatter-proxy 0.7.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use tracing::info;

use crate::classifier::BodyClassifier;
use crate::config::ScatterProxyConfig;
use crate::error::ScatterProxyError;
use crate::metrics::PoolMetrics;
use crate::task::TaskHandle;
use crate::ScatterProxy;

/// Routes proxied requests to per-host [`ScatterProxy`] instances.
///
/// Each registered host gets its own independent proxy pool, health tracker,
/// and scheduler.  Isolation means:
///
/// - A struggling host (all proxies in cooldown, low success rate) cannot
///   starve tasks for other hosts.
/// - Proxy eviction is scoped per-host: a proxy that repeatedly fails for
///   host A is marked dead only in A's pool, while remaining available to B.
/// - [`metrics_for`] gives a clean, host-scoped view of throughput and health.
///
/// Every pool is initialised with the full proxy list from `config.sources`
/// at construction time and then evolves independently.
///
/// # Example
///
/// ```no_run
/// use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};
///
/// # async fn example() -> Result<(), scatter_proxy::ScatterProxyError> {
/// let router = ScatterProxyRouter::new(
///     ["szse.cn", "sse.com.cn", "cninfo.com.cn"],
///     ScatterProxyConfig::default(),
///     DefaultClassifier,
/// ).await?;
///
/// let client = reqwest::Client::new();
/// let req = client.get("http://szse.cn/api/data").build().unwrap();
/// let handle = router.submit(req).await?;
/// // handle.await or handle.with_timeout(…) as usual
///
/// // Per-host observability
/// if let Some(m) = router.metrics_for("szse.cn") {
///     println!("szse success rate: {:.0}%", m.success_rate_1m * 100.0);
/// }
///
/// router.shutdown().await;
/// # Ok(())
/// # }
/// ```
///
/// [`metrics_for`]: ScatterProxyRouter::metrics_for
pub struct ScatterProxyRouter {
    routes: HashMap<String, ScatterProxy>,
}

impl ScatterProxyRouter {
    /// Build a router with one independent [`ScatterProxy`] pool per host.
    ///
    /// `hosts` is a list of bare hostnames (e.g. `"szse.cn"`).  Requests
    /// submitted to the router are matched against these names via
    /// [`request.url().host_str()`](reqwest::Url::host_str).
    ///
    /// The same `config` (cloned) and `classifier` (shared via `Arc`) are used
    /// for every pool.  If you need different parameters per host, build the
    /// pools manually and route requests yourself.
    ///
    /// # Errors
    ///
    /// Returns [`ScatterProxyError::Init`] if any host pool fails to initialise
    /// (e.g. the proxy source URL is unreachable and the list is empty).
    pub async fn new(
        hosts: impl IntoIterator<Item = impl Into<String>>,
        config: ScatterProxyConfig,
        classifier: impl BodyClassifier,
    ) -> Result<Self, ScatterProxyError> {
        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
        let mut routes = HashMap::new();

        for host in hosts {
            let host = host.into();
            let mut host_config = config.clone();
            if host_config.name.is_none() {
                host_config.name = Some(host.clone());
            }
            let sp = ScatterProxy::new_arc(host_config, Arc::clone(&classifier)).await?;
            routes.insert(host, sp);
        }

        info!(hosts = routes.len(), "ScatterProxyRouter initialised");
        Ok(Self { routes })
    }

    /// Submit a request to the pool registered for its host.
    ///
    /// Blocks until the pool has capacity, then returns a [`TaskHandle`].
    ///
    /// # Errors
    ///
    /// Returns [`ScatterProxyError::UnknownHost`] if the request's host is not
    /// registered in this router.
    pub async fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
        let sp = self.route(&request)?;
        Ok(sp.submit(request).await)
    }

    /// Non-blocking submit variant.
    ///
    /// # Errors
    ///
    /// Returns [`ScatterProxyError::UnknownHost`] if the host is not registered,
    /// or [`ScatterProxyError::PoolFull`] if the host's pool is at capacity.
    pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
        let sp = self.route(&request)?;
        sp.try_submit(request)
    }

    /// Returns a [`PoolMetrics`] snapshot for the given host.
    ///
    /// Returns `None` if the host is not registered in this router.
    pub fn metrics_for(&self, host: &str) -> Option<PoolMetrics> {
        self.routes.get(host).map(|sp| sp.metrics())
    }

    /// Returns [`PoolMetrics`] snapshots for every registered host.
    pub fn all_metrics(&self) -> HashMap<String, PoolMetrics> {
        self.routes
            .iter()
            .map(|(host, sp)| (host.clone(), sp.metrics()))
            .collect()
    }

    /// Returns the list of registered hostnames.
    pub fn hosts(&self) -> Vec<&str> {
        self.routes.keys().map(String::as_str).collect()
    }

    /// Gracefully shut down all host pools (persists state for each if configured).
    pub async fn shutdown(self) {
        for (_, sp) in self.routes {
            sp.shutdown().await;
        }
    }

    fn route(&self, request: &reqwest::Request) -> Result<&ScatterProxy, ScatterProxyError> {
        let host = request.url().host_str().unwrap_or("").to_string();
        self.routes
            .get(&host)
            .ok_or(ScatterProxyError::UnknownHost(host))
    }
}