1mod circuit_breaker;
6mod classifier;
7mod config;
8mod error;
9mod health;
10mod metrics;
11mod persist;
12mod proxy;
13mod rate_limit;
14mod scheduler;
15mod score;
16mod task;
17
18pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use task::{ScatterResponse, TaskHandle};
24
25pub use http::HeaderMap;
27pub use http::StatusCode;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::sync::{broadcast, Semaphore};
33use tokio::task::JoinHandle;
34use tracing::{debug, info, warn};
35
36#[allow(dead_code)]
38pub struct ScatterProxy {
39 config: Arc<ScatterProxyConfig>,
40 task_pool: Arc<task::TaskPool>,
41 health: Arc<health::HealthTracker>,
42 rate_limiter: Arc<rate_limit::RateLimiter>,
43 circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
44 proxy_manager: Arc<proxy::ProxyManager>,
45 throughput: Arc<metrics::ThroughputTracker>,
46 semaphore: Arc<Semaphore>,
47 shutdown_tx: broadcast::Sender<()>,
48 _scheduler_handle: JoinHandle<()>,
50 _persist_handle: Option<JoinHandle<()>>,
51 _metrics_handle: JoinHandle<()>,
52 _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56 pub async fn new(
62 config: ScatterProxyConfig,
63 classifier: impl BodyClassifier,
64 ) -> Result<Self, ScatterProxyError> {
65 let config = Arc::new(config);
66
67 let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
69 let health = Arc::new(health::HealthTracker::new(config.health_window));
70 let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
71 let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
72 config.circuit_breaker_threshold,
73 config.circuit_breaker_probe_interval,
74 ));
75 let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
76 let throughput = Arc::new(metrics::ThroughputTracker::new());
77 let semaphore = Arc::new(Semaphore::new(config.max_inflight));
78 let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
79
80 let proxy_count = proxy_manager
82 .fetch_and_add(&config.sources, config.prefer_remote_dns)
83 .await?;
84
85 if let Some(ref state_path) = config.state_file {
87 match persist::load_state(state_path).await {
88 Ok(Some(persisted)) => {
89 for (proxy_url, persisted_proxy) in &persisted.proxies {
90 for (host, stats) in &persisted_proxy.hosts {
92 health.restore(proxy_url, host, stats);
93 }
94 match persisted_proxy.state.as_str() {
96 "Active" => {
97 proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
98 }
99 "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
100 _ => {} }
102 }
103 debug!(
104 proxies = persisted.proxies.len(),
105 "restored persisted state"
106 );
107 }
108 Ok(None) => {
109 debug!("no persisted state file found, starting fresh");
110 }
111 Err(e) => {
112 warn!(error = %e, "failed to load persisted state, starting fresh");
113 }
114 }
115 }
116
117 info!(count = proxy_count, "loaded proxies from sources");
118
119 let (shutdown_tx, _) = broadcast::channel(1);
121
122 let sched = scheduler::Scheduler::new(
126 Arc::clone(&config),
127 Arc::clone(&task_pool),
128 Arc::clone(&health),
129 Arc::clone(&rate_limiter),
130 Arc::clone(&circuit_breakers),
131 Arc::clone(&proxy_manager),
132 Arc::clone(&classifier),
133 Arc::clone(&semaphore),
134 Arc::clone(&throughput),
135 );
136 let shutdown_rx_sched = shutdown_tx.subscribe();
137 let _scheduler_handle = tokio::spawn(async move {
138 sched.run(shutdown_rx_sched).await;
139 });
140
141 let _persist_handle = if config.state_file.is_some() {
143 let persist_config = Arc::clone(&config);
144 let persist_health = Arc::clone(&health);
145 let persist_proxy_mgr = Arc::clone(&proxy_manager);
146 let mut shutdown_rx_persist = shutdown_tx.subscribe();
147 let interval = config.state_save_interval;
148
149 Some(tokio::spawn(async move {
150 loop {
151 tokio::select! {
152 _ = shutdown_rx_persist.recv() => {
153 debug!("persist task received shutdown signal");
154 break;
155 }
156 _ = tokio::time::sleep(interval) => {
157 if let Some(ref path) = persist_config.state_file {
158 let health_stats = persist_health.get_all_stats();
159 let proxy_states = build_proxy_states(&persist_proxy_mgr);
160 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
161 warn!(error = %e, "failed to save state");
162 } else {
163 debug!("persisted state to disk");
164 }
165 }
166 }
167 }
168 }
169 }))
170 } else {
171 None
172 };
173
174 let _metrics_config = Arc::clone(&config);
176 let metrics_task_pool = Arc::clone(&task_pool);
177 let metrics_health = Arc::clone(&health);
178 let metrics_proxy_mgr = Arc::clone(&proxy_manager);
179 let metrics_throughput = Arc::clone(&throughput);
180 let metrics_semaphore = Arc::clone(&semaphore);
181 let metrics_cb = Arc::clone(&circuit_breakers);
182 let mut shutdown_rx_metrics = shutdown_tx.subscribe();
183 let metrics_interval = config.metrics_log_interval;
184 let max_inflight = config.max_inflight;
185
186 let _metrics_handle = tokio::spawn(async move {
187 loop {
188 tokio::select! {
189 _ = shutdown_rx_metrics.recv() => {
190 debug!("metrics task received shutdown signal");
191 break;
192 }
193 _ = tokio::time::sleep(metrics_interval) => {
194 let tp = metrics_throughput.throughput(Duration::from_secs(10));
195 let total_s = metrics_health.total_success();
196 let total_f = metrics_health.total_fail();
197 let total = total_s + total_f;
198 let success_pct = if total > 0 {
199 (total_s as f64 / total as f64) * 100.0
200 } else {
201 0.0
202 };
203 let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
204 let pending = metrics_task_pool.pending_count();
205 let done = metrics_task_pool.completed_count();
206 let failed = metrics_task_pool.failed_count();
207 let inflight = max_inflight - metrics_semaphore.available_permits();
208 let breakers = metrics_cb.get_all();
209 let open_breakers: Vec<&String> = breakers.iter()
210 .filter(|(_, &open)| open)
211 .map(|(h, _)| h)
212 .collect();
213
214 info!(
215 "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
216 tp,
217 success_pct,
218 healthy,
219 cooldown,
220 dead,
221 pending,
222 done,
223 failed,
224 inflight,
225 if open_breakers.is_empty() {
226 "none".to_string()
227 } else {
228 format!("{:?}", open_breakers)
229 }
230 );
231 }
232 }
233 }
234 });
235
236 let refresh_config = Arc::clone(&config);
238 let refresh_proxy_mgr = Arc::clone(&proxy_manager);
239 let mut shutdown_rx_refresh = shutdown_tx.subscribe();
240 let refresh_interval = config.source_refresh_interval;
241
242 let _refresh_handle = tokio::spawn(async move {
243 loop {
244 tokio::select! {
245 _ = shutdown_rx_refresh.recv() => {
246 debug!("refresh task received shutdown signal");
247 break;
248 }
249 _ = tokio::time::sleep(refresh_interval) => {
250 match refresh_proxy_mgr
251 .fetch_and_add(&refresh_config.sources, refresh_config.prefer_remote_dns)
252 .await
253 {
254 Ok(count) => {
255 debug!(new_count = count, "refreshed proxy sources");
256 }
257 Err(e) => {
258 warn!(error = %e, "failed to refresh proxy sources");
259 }
260 }
261 }
262 }
263 }
264 });
265
266 Ok(ScatterProxy {
267 config,
268 task_pool,
269 health,
270 rate_limiter,
271 circuit_breakers,
272 proxy_manager,
273 throughput,
274 semaphore,
275 shutdown_tx,
276 _scheduler_handle,
277 _persist_handle,
278 _metrics_handle,
279 _refresh_handle,
280 })
281 }
282
283 pub fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
288 self.task_pool
289 .submit(request, self.config.max_attempts, self.config.task_timeout)
290 }
291
292 pub fn submit_batch(
297 &self,
298 requests: Vec<reqwest::Request>,
299 ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
300 self.task_pool
301 .submit_batch(requests, self.config.max_attempts, self.config.task_timeout)
302 }
303
304 pub fn metrics(&self) -> PoolMetrics {
306 let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
307
308 let total_s = self.health.total_success();
309 let total_f = self.health.total_fail();
310 let total_requests = total_s + total_f;
311 let success_rate = if total_requests > 0 {
312 total_s as f64 / total_requests as f64
313 } else {
314 0.0
315 };
316
317 PoolMetrics {
318 total_proxies: total,
319 healthy_proxies: healthy,
320 cooldown_proxies: cooldown,
321 dead_proxies: dead,
322
323 pending_tasks: self.task_pool.pending_count(),
324 completed_tasks: self.task_pool.completed_count(),
325 failed_tasks: self.task_pool.failed_count(),
326
327 throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
328 throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
329 throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
330
331 success_rate_1m: success_rate,
332 avg_latency_ms: self.health.avg_latency_ms(),
333
334 inflight: self.config.max_inflight - self.semaphore.available_permits(),
335 circuit_breakers: self.circuit_breakers.get_all(),
336 }
337 }
338
339 pub async fn shutdown(self) {
342 info!("initiating scatter-proxy shutdown");
343
344 let _ = self.shutdown_tx.send(());
346
347 if let Some(ref path) = self.config.state_file {
349 let health_stats = self.health.get_all_stats();
350 let proxy_states = build_proxy_states(&self.proxy_manager);
351 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
352 warn!(error = %e, "failed to save final state during shutdown");
353 } else {
354 debug!("saved final state to disk");
355 }
356 }
357
358 info!("scatter-proxy shutdown complete");
359 }
362}
363
364fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
366 let urls = proxy_manager.all_proxy_urls();
367 let mut states = HashMap::with_capacity(urls.len());
368 for url in urls {
369 let state = proxy_manager.get_state(&url);
370 let label = match state {
371 proxy::ProxyState::Active => "Active",
372 proxy::ProxyState::Dead => "Dead",
373 proxy::ProxyState::Unknown => "Unknown",
374 };
375 states.insert(url, label.to_string());
376 }
377 states
378}