Skip to main content

scatter_proxy/
router.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tracing::info;
5
6use crate::classifier::BodyClassifier;
7use crate::config::ScatterProxyConfig;
8use crate::error::ScatterProxyError;
9use crate::metrics::PoolMetrics;
10use crate::task::TaskHandle;
11use crate::ScatterProxy;
12
13/// Routes proxied requests to per-host [`ScatterProxy`] instances.
14///
15/// Each registered host gets its own independent proxy pool, health tracker,
16/// and scheduler.  Isolation means:
17///
18/// - A struggling host (all proxies in cooldown, low success rate) cannot
19///   starve tasks for other hosts.
20/// - Proxy eviction is scoped per-host: a proxy that repeatedly fails for
21///   host A is marked dead only in A's pool, while remaining available to B.
22/// - [`metrics_for`] gives a clean, host-scoped view of throughput and health.
23///
24/// Every pool is initialised with the full proxy list from `config.sources`
25/// at construction time and then evolves independently.
26///
27/// # Example
28///
29/// ```no_run
30/// use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};
31///
32/// # async fn example() -> Result<(), scatter_proxy::ScatterProxyError> {
33/// let router = ScatterProxyRouter::new(
34///     ["szse.cn", "sse.com.cn", "cninfo.com.cn"],
35///     ScatterProxyConfig::default(),
36///     DefaultClassifier,
37/// ).await?;
38///
39/// let client = reqwest::Client::new();
40/// let req = client.get("http://szse.cn/api/data").build().unwrap();
41/// let handle = router.submit(req).await?;
42/// // handle.await or handle.with_timeout(…) as usual
43///
44/// // Per-host observability
45/// if let Some(m) = router.metrics_for("szse.cn") {
46///     println!("szse success rate: {:.0}%", m.success_rate_1m * 100.0);
47/// }
48///
49/// router.shutdown().await;
50/// # Ok(())
51/// # }
52/// ```
53///
54/// [`metrics_for`]: ScatterProxyRouter::metrics_for
55pub struct ScatterProxyRouter {
56    routes: HashMap<String, ScatterProxy>,
57}
58
59impl ScatterProxyRouter {
60    /// Build a router with one independent [`ScatterProxy`] pool per host.
61    ///
62    /// `hosts` is a list of bare hostnames (e.g. `"szse.cn"`).  Requests
63    /// submitted to the router are matched against these names via
64    /// [`request.url().host_str()`](reqwest::Url::host_str).
65    ///
66    /// The same `config` (cloned) and `classifier` (shared via `Arc`) are used
67    /// for every pool.  If you need different parameters per host, build the
68    /// pools manually and route requests yourself.
69    ///
70    /// # Errors
71    ///
72    /// Returns [`ScatterProxyError::Init`] if any host pool fails to initialise
73    /// (e.g. the proxy source URL is unreachable and the list is empty).
74    pub async fn new(
75        hosts: impl IntoIterator<Item = impl Into<String>>,
76        config: ScatterProxyConfig,
77        classifier: impl BodyClassifier,
78    ) -> Result<Self, ScatterProxyError> {
79        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
80        let mut routes = HashMap::new();
81
82        for host in hosts {
83            let host = host.into();
84            let mut host_config = config.clone();
85            if host_config.name.is_none() {
86                host_config.name = Some(host.clone());
87            }
88            let sp = ScatterProxy::new_arc(host_config, Arc::clone(&classifier)).await?;
89            routes.insert(host, sp);
90        }
91
92        info!(hosts = routes.len(), "ScatterProxyRouter initialised");
93        Ok(Self { routes })
94    }
95
96    /// Submit a request to the pool registered for its host.
97    ///
98    /// Blocks until the pool has capacity, then returns a [`TaskHandle`].
99    ///
100    /// # Errors
101    ///
102    /// Returns [`ScatterProxyError::UnknownHost`] if the request's host is not
103    /// registered in this router.
104    pub async fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
105        let sp = self.route(&request)?;
106        Ok(sp.submit(request).await)
107    }
108
109    /// Non-blocking submit variant.
110    ///
111    /// # Errors
112    ///
113    /// Returns [`ScatterProxyError::UnknownHost`] if the host is not registered,
114    /// or [`ScatterProxyError::PoolFull`] if the host's pool is at capacity.
115    pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
116        let sp = self.route(&request)?;
117        sp.try_submit(request)
118    }
119
120    /// Returns a [`PoolMetrics`] snapshot for the given host.
121    ///
122    /// Returns `None` if the host is not registered in this router.
123    pub fn metrics_for(&self, host: &str) -> Option<PoolMetrics> {
124        self.routes.get(host).map(|sp| sp.metrics())
125    }
126
127    /// Returns [`PoolMetrics`] snapshots for every registered host.
128    pub fn all_metrics(&self) -> HashMap<String, PoolMetrics> {
129        self.routes
130            .iter()
131            .map(|(host, sp)| (host.clone(), sp.metrics()))
132            .collect()
133    }
134
135    /// Returns the list of registered hostnames.
136    pub fn hosts(&self) -> Vec<&str> {
137        self.routes.keys().map(String::as_str).collect()
138    }
139
140    /// Gracefully shut down all host pools (persists state for each if configured).
141    pub async fn shutdown(self) {
142        for (_, sp) in self.routes {
143            sp.shutdown().await;
144        }
145    }
146
147    fn route(&self, request: &reqwest::Request) -> Result<&ScatterProxy, ScatterProxyError> {
148        let host = request.url().host_str().unwrap_or("").to_string();
149        self.routes
150            .get(&host)
151            .ok_or(ScatterProxyError::UnknownHost(host))
152    }
153}