scatter_proxy/router.rs
1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tracing::info;
5
6use crate::classifier::BodyClassifier;
7use crate::config::ScatterProxyConfig;
8use crate::error::ScatterProxyError;
9use crate::metrics::PoolMetrics;
10use crate::task::TaskHandle;
11use crate::ScatterProxy;
12
13/// Routes proxied requests to per-host [`ScatterProxy`] instances.
14///
15/// Each registered host gets its own independent proxy pool, health tracker,
16/// and scheduler. Isolation means:
17///
18/// - A struggling host (all proxies in cooldown, low success rate) cannot
19/// starve tasks for other hosts.
20/// - Proxy eviction is scoped per-host: a proxy that repeatedly fails for
21/// host A is marked dead only in A's pool, while remaining available to B.
22/// - [`metrics_for`] gives a clean, host-scoped view of throughput and health.
23///
24/// Every pool is initialised with the full proxy list from `config.sources`
25/// at construction time and then evolves independently.
26///
27/// # Example
28///
29/// ```no_run
30/// use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};
31///
32/// # async fn example() -> Result<(), scatter_proxy::ScatterProxyError> {
33/// let router = ScatterProxyRouter::new(
34/// ["szse.cn", "sse.com.cn", "cninfo.com.cn"],
35/// ScatterProxyConfig::default(),
36/// DefaultClassifier,
37/// ).await?;
38///
39/// let client = reqwest::Client::new();
40/// let req = client.get("http://szse.cn/api/data").build().unwrap();
41/// let handle = router.submit(req).await?;
42/// // handle.await or handle.with_timeout(…) as usual
43///
44/// // Per-host observability
45/// if let Some(m) = router.metrics_for("szse.cn") {
46/// println!("szse success rate: {:.0}%", m.success_rate_1m * 100.0);
47/// }
48///
49/// router.shutdown().await;
50/// # Ok(())
51/// # }
52/// ```
53///
54/// [`metrics_for`]: ScatterProxyRouter::metrics_for
55pub struct ScatterProxyRouter {
56 routes: HashMap<String, ScatterProxy>,
57}
58
59impl ScatterProxyRouter {
60 /// Build a router with one independent [`ScatterProxy`] pool per host.
61 ///
62 /// `hosts` is a list of bare hostnames (e.g. `"szse.cn"`). Requests
63 /// submitted to the router are matched against these names via
64 /// [`request.url().host_str()`](reqwest::Url::host_str).
65 ///
66 /// The same `config` (cloned) and `classifier` (shared via `Arc`) are used
67 /// for every pool. If you need different parameters per host, build the
68 /// pools manually and route requests yourself.
69 ///
70 /// # Errors
71 ///
72 /// Returns [`ScatterProxyError::Init`] if any host pool fails to initialise
73 /// (e.g. the proxy source URL is unreachable and the list is empty).
74 pub async fn new(
75 hosts: impl IntoIterator<Item = impl Into<String>>,
76 config: ScatterProxyConfig,
77 classifier: impl BodyClassifier,
78 ) -> Result<Self, ScatterProxyError> {
79 let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
80 let mut routes = HashMap::new();
81
82 for host in hosts {
83 let host = host.into();
84 let mut host_config = config.clone();
85 if host_config.name.is_none() {
86 host_config.name = Some(host.clone());
87 }
88 let sp = ScatterProxy::new_arc(host_config, Arc::clone(&classifier)).await?;
89 routes.insert(host, sp);
90 }
91
92 info!(hosts = routes.len(), "ScatterProxyRouter initialised");
93 Ok(Self { routes })
94 }
95
96 /// Submit a request to the pool registered for its host.
97 ///
98 /// Blocks until the pool has capacity, then returns a [`TaskHandle`].
99 ///
100 /// # Errors
101 ///
102 /// Returns [`ScatterProxyError::UnknownHost`] if the request's host is not
103 /// registered in this router.
104 pub async fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
105 let sp = self.route(&request)?;
106 Ok(sp.submit(request).await)
107 }
108
109 /// Non-blocking submit variant.
110 ///
111 /// # Errors
112 ///
113 /// Returns [`ScatterProxyError::UnknownHost`] if the host is not registered,
114 /// or [`ScatterProxyError::PoolFull`] if the host's pool is at capacity.
115 pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
116 let sp = self.route(&request)?;
117 sp.try_submit(request)
118 }
119
120 /// Returns a [`PoolMetrics`] snapshot for the given host.
121 ///
122 /// Returns `None` if the host is not registered in this router.
123 pub fn metrics_for(&self, host: &str) -> Option<PoolMetrics> {
124 self.routes.get(host).map(|sp| sp.metrics())
125 }
126
127 /// Returns [`PoolMetrics`] snapshots for every registered host.
128 pub fn all_metrics(&self) -> HashMap<String, PoolMetrics> {
129 self.routes
130 .iter()
131 .map(|(host, sp)| (host.clone(), sp.metrics()))
132 .collect()
133 }
134
135 /// Returns the list of registered hostnames.
136 pub fn hosts(&self) -> Vec<&str> {
137 self.routes.keys().map(String::as_str).collect()
138 }
139
140 /// Gracefully shut down all host pools (persists state for each if configured).
141 pub async fn shutdown(self) {
142 for (_, sp) in self.routes {
143 sp.shutdown().await;
144 }
145 }
146
147 fn route(&self, request: &reqwest::Request) -> Result<&ScatterProxy, ScatterProxyError> {
148 let host = request.url().host_str().unwrap_or("").to_string();
149 self.routes
150 .get(&host)
151 .ok_or(ScatterProxyError::UnknownHost(host))
152 }
153}