scatter_proxy/router.rs
1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tracing::info;
5
6use crate::classifier::BodyClassifier;
7use crate::config::ScatterProxyConfig;
8use crate::error::ScatterProxyError;
9use crate::metrics::PoolMetrics;
10use crate::task::TaskHandle;
11use crate::ScatterProxy;
12
13/// Routes proxied requests to per-host [`ScatterProxy`] instances.
14///
15/// Each registered host gets its own independent proxy pool, health tracker,
16/// and scheduler. Isolation means:
17///
18/// - A struggling host (all proxies in cooldown, low success rate) cannot
19/// starve tasks for other hosts.
20/// - Proxy eviction is scoped per-host: a proxy that repeatedly fails for
21/// host A is marked dead only in A's pool, while remaining available to B.
22/// - [`metrics_for`] gives a clean, host-scoped view of throughput and health.
23///
24/// Each host is configured independently via its own [`ScatterProxyConfig`],
25/// so you can tune sources, concurrency, timeouts, and logging per target.
26///
27/// # Example
28///
29/// ```no_run
30/// use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};
31///
32/// # async fn example() -> Result<(), scatter_proxy::ScatterProxyError> {
33/// let router = ScatterProxyRouter::new(
34/// [
35/// ("szse.cn", ScatterProxyConfig { max_inflight: 20, ..Default::default() }),
36/// ("sse.com.cn", ScatterProxyConfig { max_inflight: 10, ..Default::default() }),
37/// ("cninfo.com.cn", ScatterProxyConfig::default()),
38/// ],
39/// DefaultClassifier,
40/// ).await?;
41///
42/// let client = reqwest::Client::new();
43/// let req = client.get("http://szse.cn/api/data").build().unwrap();
44/// let handle = router.submit(req).await?;
45/// // handle.await or handle.with_timeout(…) as usual
46///
47/// // Per-host observability
48/// if let Some(m) = router.metrics_for("szse.cn") {
49/// println!("szse success rate: {:.0}%", m.success_rate_1m * 100.0);
50/// }
51///
52/// router.shutdown().await;
53/// # Ok(())
54/// # }
55/// ```
56///
57/// [`metrics_for`]: ScatterProxyRouter::metrics_for
58pub struct ScatterProxyRouter {
59 routes: HashMap<String, ScatterProxy>,
60}
61
62impl ScatterProxyRouter {
63 /// Build a router with one independent [`ScatterProxy`] pool per host.
64 ///
65 /// `routes` is an iterable of `(hostname, config)` pairs. Each host gets
66 /// its own pool configured independently. If `config.name` is not set it
67 /// defaults to the hostname so that metrics logs are prefixed correctly.
68 ///
69 /// # Errors
70 ///
71 /// Returns [`ScatterProxyError::Init`] if any host pool fails to initialise
72 /// (e.g. the proxy source URL is unreachable and the list is empty).
73 pub async fn new(
74 routes: impl IntoIterator<Item = (impl Into<String>, ScatterProxyConfig)>,
75 classifier: impl BodyClassifier,
76 ) -> Result<Self, ScatterProxyError> {
77 let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
78 let mut map = HashMap::new();
79
80 for (host, mut config) in routes {
81 let host = host.into();
82 if config.name.is_none() {
83 config.name = Some(host.clone());
84 }
85 let sp = ScatterProxy::new_arc(config, Arc::clone(&classifier)).await?;
86 map.insert(host, sp);
87 }
88
89 info!(hosts = map.len(), "ScatterProxyRouter initialised");
90 Ok(Self { routes: map })
91 }
92
93 /// Submit a request to the pool registered for its host.
94 ///
95 /// Blocks until the pool has capacity, then returns a [`TaskHandle`].
96 ///
97 /// # Errors
98 ///
99 /// Returns [`ScatterProxyError::UnknownHost`] if the request's host is not
100 /// registered in this router.
101 pub async fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
102 let sp = self.route(&request)?;
103 Ok(sp.submit(request).await)
104 }
105
106 /// Non-blocking submit variant.
107 ///
108 /// # Errors
109 ///
110 /// Returns [`ScatterProxyError::UnknownHost`] if the host is not registered,
111 /// or [`ScatterProxyError::PoolFull`] if the host's pool is at capacity.
112 pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
113 let sp = self.route(&request)?;
114 sp.try_submit(request)
115 }
116
117 /// Returns a [`PoolMetrics`] snapshot for the given host.
118 ///
119 /// Returns `None` if the host is not registered in this router.
120 pub fn metrics_for(&self, host: &str) -> Option<PoolMetrics> {
121 self.routes.get(host).map(|sp| sp.metrics())
122 }
123
124 /// Returns [`PoolMetrics`] snapshots for every registered host.
125 pub fn all_metrics(&self) -> HashMap<String, PoolMetrics> {
126 self.routes
127 .iter()
128 .map(|(host, sp)| (host.clone(), sp.metrics()))
129 .collect()
130 }
131
132 /// Returns the list of registered hostnames.
133 pub fn hosts(&self) -> Vec<&str> {
134 self.routes.keys().map(String::as_str).collect()
135 }
136
137 /// Gracefully shut down all host pools (persists state for each if configured).
138 pub async fn shutdown(self) {
139 for (_, sp) in self.routes {
140 sp.shutdown().await;
141 }
142 }
143
144 fn route(&self, request: &reqwest::Request) -> Result<&ScatterProxy, ScatterProxyError> {
145 let host = request.url().host_str().unwrap_or("").to_string();
146 self.routes
147 .get(&host)
148 .ok_or(ScatterProxyError::UnknownHost(host))
149 }
150}