Skip to main content

scatter_proxy/
router.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tracing::info;
5
6use crate::classifier::BodyClassifier;
7use crate::config::ScatterProxyConfig;
8use crate::error::ScatterProxyError;
9use crate::metrics::PoolMetrics;
10use crate::task::TaskHandle;
11use crate::ScatterProxy;
12
13/// Routes proxied requests to per-host [`ScatterProxy`] instances.
14///
15/// Each registered host gets its own independent proxy pool, health tracker,
16/// and scheduler.  Isolation means:
17///
18/// - A struggling host (all proxies in cooldown, low success rate) cannot
19///   starve tasks for other hosts.
20/// - Proxy eviction is scoped per-host: a proxy that repeatedly fails for
21///   host A is marked dead only in A's pool, while remaining available to B.
22/// - [`metrics_for`] gives a clean, host-scoped view of throughput and health.
23///
24/// Each host is configured independently via its own [`ScatterProxyConfig`],
25/// so you can tune sources, concurrency, timeouts, and logging per target.
26///
27/// # Example
28///
29/// ```no_run
30/// use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};
31///
32/// # async fn example() -> Result<(), scatter_proxy::ScatterProxyError> {
33/// let router = ScatterProxyRouter::new(
34///     [
35///         ("szse.cn",     ScatterProxyConfig { max_inflight: 20, ..Default::default() }),
36///         ("sse.com.cn",  ScatterProxyConfig { max_inflight: 10, ..Default::default() }),
37///         ("cninfo.com.cn", ScatterProxyConfig::default()),
38///     ],
39///     DefaultClassifier,
40/// ).await?;
41///
42/// let client = reqwest::Client::new();
43/// let req = client.get("http://szse.cn/api/data").build().unwrap();
44/// let handle = router.submit(req).await?;
45/// // handle.await or handle.with_timeout(…) as usual
46///
47/// // Per-host observability
48/// if let Some(m) = router.metrics_for("szse.cn") {
49///     println!("szse success rate: {:.0}%", m.success_rate_1m * 100.0);
50/// }
51///
52/// router.shutdown().await;
53/// # Ok(())
54/// # }
55/// ```
56///
57/// [`metrics_for`]: ScatterProxyRouter::metrics_for
58pub struct ScatterProxyRouter {
59    routes: HashMap<String, ScatterProxy>,
60}
61
62impl ScatterProxyRouter {
63    /// Build a router with one independent [`ScatterProxy`] pool per host.
64    ///
65    /// `routes` is an iterable of `(hostname, config)` pairs.  Each host gets
66    /// its own pool configured independently.  If `config.name` is not set it
67    /// defaults to the hostname so that metrics logs are prefixed correctly.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`ScatterProxyError::Init`] if any host pool fails to initialise
72    /// (e.g. the proxy source URL is unreachable and the list is empty).
73    pub async fn new(
74        routes: impl IntoIterator<Item = (impl Into<String>, ScatterProxyConfig)>,
75        classifier: impl BodyClassifier,
76    ) -> Result<Self, ScatterProxyError> {
77        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
78        let mut map = HashMap::new();
79
80        for (host, mut config) in routes {
81            let host = host.into();
82            if config.name.is_none() {
83                config.name = Some(host.clone());
84            }
85            let sp = ScatterProxy::new_arc(config, Arc::clone(&classifier)).await?;
86            map.insert(host, sp);
87        }
88
89        info!(hosts = map.len(), "ScatterProxyRouter initialised");
90        Ok(Self { routes: map })
91    }
92
93    /// Submit a request to the pool registered for its host.
94    ///
95    /// Blocks until the pool has capacity, then returns a [`TaskHandle`].
96    ///
97    /// # Errors
98    ///
99    /// Returns [`ScatterProxyError::UnknownHost`] if the request's host is not
100    /// registered in this router.
101    pub async fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
102        let sp = self.route(&request)?;
103        Ok(sp.submit(request).await)
104    }
105
106    /// Non-blocking submit variant.
107    ///
108    /// # Errors
109    ///
110    /// Returns [`ScatterProxyError::UnknownHost`] if the host is not registered,
111    /// or [`ScatterProxyError::PoolFull`] if the host's pool is at capacity.
112    pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
113        let sp = self.route(&request)?;
114        sp.try_submit(request)
115    }
116
117    /// Returns a [`PoolMetrics`] snapshot for the given host.
118    ///
119    /// Returns `None` if the host is not registered in this router.
120    pub fn metrics_for(&self, host: &str) -> Option<PoolMetrics> {
121        self.routes.get(host).map(|sp| sp.metrics())
122    }
123
124    /// Returns [`PoolMetrics`] snapshots for every registered host.
125    pub fn all_metrics(&self) -> HashMap<String, PoolMetrics> {
126        self.routes
127            .iter()
128            .map(|(host, sp)| (host.clone(), sp.metrics()))
129            .collect()
130    }
131
132    /// Returns the list of registered hostnames.
133    pub fn hosts(&self) -> Vec<&str> {
134        self.routes.keys().map(String::as_str).collect()
135    }
136
137    /// Gracefully shut down all host pools (persists state for each if configured).
138    pub async fn shutdown(self) {
139        for (_, sp) in self.routes {
140            sp.shutdown().await;
141        }
142    }
143
144    fn route(&self, request: &reqwest::Request) -> Result<&ScatterProxy, ScatterProxyError> {
145        let host = request.url().host_str().unwrap_or("").to_string();
146        self.routes
147            .get(&host)
148            .ok_or(ScatterProxyError::UnknownHost(host))
149    }
150}