1mod circuit_breaker;
6mod classifier;
7mod config;
8mod error;
9mod health;
10mod metrics;
11mod persist;
12mod proxy;
13mod rate_limit;
14mod scheduler;
15mod score;
16mod task;
17
18pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use task::{ScatterResponse, TaskHandle};
24
25pub use http::HeaderMap;
27pub use http::StatusCode;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::sync::{broadcast, Semaphore};
33use tokio::task::JoinHandle;
34use tracing::{debug, info, warn};
35
36#[allow(dead_code)]
38pub struct ScatterProxy {
39 config: Arc<ScatterProxyConfig>,
40 task_pool: Arc<task::TaskPool>,
41 health: Arc<health::HealthTracker>,
42 rate_limiter: Arc<rate_limit::RateLimiter>,
43 circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
44 proxy_manager: Arc<proxy::ProxyManager>,
45 throughput: Arc<metrics::ThroughputTracker>,
46 semaphore: Arc<Semaphore>,
47 shutdown_tx: broadcast::Sender<()>,
48 _scheduler_handle: JoinHandle<()>,
50 _persist_handle: Option<JoinHandle<()>>,
51 _metrics_handle: JoinHandle<()>,
52 _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56 pub async fn new(
62 config: ScatterProxyConfig,
63 classifier: impl BodyClassifier,
64 ) -> Result<Self, ScatterProxyError> {
65 let config = Arc::new(config);
66
67 let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
69 let health = Arc::new(health::HealthTracker::new(config.health_window));
70 let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
71 let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
72 config.circuit_breaker_threshold,
73 config.circuit_breaker_probe_interval,
74 ));
75 let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
76 let throughput = Arc::new(metrics::ThroughputTracker::new());
77 let semaphore = Arc::new(Semaphore::new(config.max_inflight));
78 let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
79
80 let effective_sources: Vec<String> = if config.sources.is_empty() {
82 info!("no custom sources configured, using default free proxy sources");
83 DEFAULT_PROXY_SOURCES
84 .iter()
85 .map(|s| s.to_string())
86 .collect()
87 } else {
88 config.sources.clone()
89 };
90
91 let proxy_count = proxy_manager
93 .fetch_and_add(&effective_sources, config.prefer_remote_dns)
94 .await?;
95
96 if let Some(ref state_path) = config.state_file {
98 match persist::load_state(state_path).await {
99 Ok(Some(persisted)) => {
100 for (proxy_url, persisted_proxy) in &persisted.proxies {
101 for (host, stats) in &persisted_proxy.hosts {
103 health.restore(proxy_url, host, stats);
104 }
105 match persisted_proxy.state.as_str() {
107 "Active" => {
108 proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
109 }
110 "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
111 _ => {} }
113 }
114 debug!(
115 proxies = persisted.proxies.len(),
116 "restored persisted state"
117 );
118 }
119 Ok(None) => {
120 debug!("no persisted state file found, starting fresh");
121 }
122 Err(e) => {
123 warn!(error = %e, "failed to load persisted state, starting fresh");
124 }
125 }
126 }
127
128 info!(count = proxy_count, "loaded proxies from sources");
129
130 let (shutdown_tx, _) = broadcast::channel(1);
132
133 let sched = scheduler::Scheduler::new(
137 Arc::clone(&config),
138 Arc::clone(&task_pool),
139 Arc::clone(&health),
140 Arc::clone(&rate_limiter),
141 Arc::clone(&circuit_breakers),
142 Arc::clone(&proxy_manager),
143 Arc::clone(&classifier),
144 Arc::clone(&semaphore),
145 Arc::clone(&throughput),
146 );
147 let shutdown_rx_sched = shutdown_tx.subscribe();
148 let _scheduler_handle = tokio::spawn(async move {
149 sched.run(shutdown_rx_sched).await;
150 });
151
152 let _persist_handle = if config.state_file.is_some() {
154 let persist_config = Arc::clone(&config);
155 let persist_health = Arc::clone(&health);
156 let persist_proxy_mgr = Arc::clone(&proxy_manager);
157 let mut shutdown_rx_persist = shutdown_tx.subscribe();
158 let interval = config.state_save_interval;
159
160 Some(tokio::spawn(async move {
161 loop {
162 tokio::select! {
163 _ = shutdown_rx_persist.recv() => {
164 debug!("persist task received shutdown signal");
165 break;
166 }
167 _ = tokio::time::sleep(interval) => {
168 if let Some(ref path) = persist_config.state_file {
169 let health_stats = persist_health.get_all_stats();
170 let proxy_states = build_proxy_states(&persist_proxy_mgr);
171 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
172 warn!(error = %e, "failed to save state");
173 } else {
174 debug!("persisted state to disk");
175 }
176 }
177 }
178 }
179 }
180 }))
181 } else {
182 None
183 };
184
185 let _metrics_config = Arc::clone(&config);
187 let metrics_task_pool = Arc::clone(&task_pool);
188 let metrics_health = Arc::clone(&health);
189 let metrics_proxy_mgr = Arc::clone(&proxy_manager);
190 let metrics_throughput = Arc::clone(&throughput);
191 let metrics_semaphore = Arc::clone(&semaphore);
192 let metrics_cb = Arc::clone(&circuit_breakers);
193 let mut shutdown_rx_metrics = shutdown_tx.subscribe();
194 let metrics_interval = config.metrics_log_interval;
195 let max_inflight = config.max_inflight;
196
197 let _metrics_handle = tokio::spawn(async move {
198 loop {
199 tokio::select! {
200 _ = shutdown_rx_metrics.recv() => {
201 debug!("metrics task received shutdown signal");
202 break;
203 }
204 _ = tokio::time::sleep(metrics_interval) => {
205 let tp = metrics_throughput.throughput(Duration::from_secs(10));
206 let total_s = metrics_health.total_success();
207 let total_f = metrics_health.total_fail();
208 let total = total_s + total_f;
209 let success_pct = if total > 0 {
210 (total_s as f64 / total as f64) * 100.0
211 } else {
212 0.0
213 };
214 let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
215 let pending = metrics_task_pool.pending_count();
216 let done = metrics_task_pool.completed_count();
217 let failed = metrics_task_pool.failed_count();
218 let inflight = max_inflight - metrics_semaphore.available_permits();
219 let breakers = metrics_cb.get_all();
220 let open_breakers: Vec<&String> = breakers.iter()
221 .filter(|(_, &open)| open)
222 .map(|(h, _)| h)
223 .collect();
224
225 info!(
226 "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
227 tp,
228 success_pct,
229 healthy,
230 cooldown,
231 dead,
232 pending,
233 done,
234 failed,
235 inflight,
236 if open_breakers.is_empty() {
237 "none".to_string()
238 } else {
239 format!("{:?}", open_breakers)
240 }
241 );
242 }
243 }
244 }
245 });
246
247 let effective_sources = Arc::new(effective_sources);
249 let refresh_sources = Arc::clone(&effective_sources);
250 let refresh_config = Arc::clone(&config);
251 let refresh_proxy_mgr = Arc::clone(&proxy_manager);
252 let mut shutdown_rx_refresh = shutdown_tx.subscribe();
253 let refresh_interval = config.source_refresh_interval;
254
255 let _refresh_handle = tokio::spawn(async move {
256 loop {
257 tokio::select! {
258 _ = shutdown_rx_refresh.recv() => {
259 debug!("refresh task received shutdown signal");
260 break;
261 }
262 _ = tokio::time::sleep(refresh_interval) => {
263 match refresh_proxy_mgr
264 .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
265 .await
266 {
267 Ok(count) => {
268 debug!(new_count = count, "refreshed proxy sources");
269 }
270 Err(e) => {
271 warn!(error = %e, "failed to refresh proxy sources");
272 }
273 }
274 }
275 }
276 }
277 });
278
279 Ok(ScatterProxy {
280 config,
281 task_pool,
282 health,
283 rate_limiter,
284 circuit_breakers,
285 proxy_manager,
286 throughput,
287 semaphore,
288 shutdown_tx,
289 _scheduler_handle,
290 _persist_handle,
291 _metrics_handle,
292 _refresh_handle,
293 })
294 }
295
296 pub fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
301 self.task_pool
302 .submit(request, self.config.max_attempts, self.config.task_timeout)
303 }
304
305 pub fn submit_batch(
310 &self,
311 requests: Vec<reqwest::Request>,
312 ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
313 self.task_pool
314 .submit_batch(requests, self.config.max_attempts, self.config.task_timeout)
315 }
316
317 pub fn metrics(&self) -> PoolMetrics {
319 let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
320
321 let total_s = self.health.total_success();
322 let total_f = self.health.total_fail();
323 let total_requests = total_s + total_f;
324 let success_rate = if total_requests > 0 {
325 total_s as f64 / total_requests as f64
326 } else {
327 0.0
328 };
329
330 PoolMetrics {
331 total_proxies: total,
332 healthy_proxies: healthy,
333 cooldown_proxies: cooldown,
334 dead_proxies: dead,
335
336 pending_tasks: self.task_pool.pending_count(),
337 completed_tasks: self.task_pool.completed_count(),
338 failed_tasks: self.task_pool.failed_count(),
339
340 throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
341 throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
342 throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
343
344 success_rate_1m: success_rate,
345 avg_latency_ms: self.health.avg_latency_ms(),
346
347 inflight: self.config.max_inflight - self.semaphore.available_permits(),
348 circuit_breakers: self.circuit_breakers.get_all(),
349 }
350 }
351
352 pub async fn shutdown(self) {
355 info!("initiating scatter-proxy shutdown");
356
357 let _ = self.shutdown_tx.send(());
359
360 if let Some(ref path) = self.config.state_file {
362 let health_stats = self.health.get_all_stats();
363 let proxy_states = build_proxy_states(&self.proxy_manager);
364 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
365 warn!(error = %e, "failed to save final state during shutdown");
366 } else {
367 debug!("saved final state to disk");
368 }
369 }
370
371 info!("scatter-proxy shutdown complete");
372 }
375}
376
377fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
379 let urls = proxy_manager.all_proxy_urls();
380 let mut states = HashMap::with_capacity(urls.len());
381 for url in urls {
382 let state = proxy_manager.get_state(&url);
383 let label = match state {
384 proxy::ProxyState::Active => "Active",
385 proxy::ProxyState::Dead => "Dead",
386 proxy::ProxyState::Unknown => "Unknown",
387 };
388 states.insert(url, label.to_string());
389 }
390 states
391}