1mod classifier;
6mod config;
7mod error;
8mod health;
9mod metrics;
10mod persist;
11mod proxy;
12mod rate_limit;
13mod router;
14mod scheduler;
15mod score;
16mod task;
17
18pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use router::ScatterProxyRouter;
24pub use task::{ScatterResponse, TaskHandle};
25
26pub use http::HeaderMap;
28pub use http::StatusCode;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tokio::sync::{broadcast, Semaphore};
34use tokio::task::JoinHandle;
35use tracing::{debug, info, warn};
36
37#[allow(dead_code)]
39pub struct ScatterProxy {
40 config: Arc<ScatterProxyConfig>,
41 task_pool: Arc<task::TaskPool>,
42 health: Arc<health::HealthTracker>,
43 rate_limiter: Arc<rate_limit::RateLimiter>,
44 proxy_manager: Arc<proxy::ProxyManager>,
45 throughput: Arc<metrics::ThroughputTracker>,
46 semaphore: Arc<Semaphore>,
47 shutdown_tx: broadcast::Sender<()>,
48 _scheduler_handle: JoinHandle<()>,
50 _persist_handle: Option<JoinHandle<()>>,
51 _metrics_handle: JoinHandle<()>,
52 _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56 pub async fn new(
62 config: ScatterProxyConfig,
63 classifier: impl BodyClassifier,
64 ) -> Result<Self, ScatterProxyError> {
65 Self::new_arc(config, Arc::new(classifier)).await
66 }
67
68 pub(crate) async fn new_arc(
73 config: ScatterProxyConfig,
74 classifier: Arc<dyn BodyClassifier>,
75 ) -> Result<Self, ScatterProxyError> {
76 let metrics_name = config.name.clone();
77 let config = Arc::new(config);
78
79 let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
81 let health = Arc::new(health::HealthTracker::new(config.health_window));
82 let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
83 let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
84 let throughput = Arc::new(metrics::ThroughputTracker::new());
85 let semaphore = Arc::new(Semaphore::new(config.max_inflight));
86
87 let effective_sources: Vec<String> = if config.sources.is_empty() {
89 info!("no custom sources configured, using default free proxy sources");
90 DEFAULT_PROXY_SOURCES
91 .iter()
92 .map(|s| s.to_string())
93 .collect()
94 } else {
95 config.sources.clone()
96 };
97
98 let proxy_count = proxy_manager
100 .fetch_and_add(&effective_sources, config.prefer_remote_dns)
101 .await?;
102
103 if let Some(ref state_path) = config.state_file {
105 match persist::load_state(state_path).await {
106 Ok(Some(persisted)) => {
107 for (proxy_url, persisted_proxy) in &persisted.proxies {
108 for (host, stats) in &persisted_proxy.hosts {
110 health.restore(proxy_url, host, stats);
111 }
112 match persisted_proxy.state.as_str() {
114 "Active" => {
115 proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
116 }
117 "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
118 _ => {} }
120 }
121 debug!(
122 proxies = persisted.proxies.len(),
123 "restored persisted state"
124 );
125 }
126 Ok(None) => {
127 debug!("no persisted state file found, starting fresh");
128 }
129 Err(e) => {
130 warn!(error = %e, "failed to load persisted state, starting fresh");
131 }
132 }
133 }
134
135 info!(count = proxy_count, "loaded proxies from sources");
136
137 let (shutdown_tx, _) = broadcast::channel(1);
139
140 let sched = scheduler::Scheduler::new(
144 Arc::clone(&config),
145 Arc::clone(&task_pool),
146 Arc::clone(&health),
147 Arc::clone(&rate_limiter),
148 Arc::clone(&proxy_manager),
149 Arc::clone(&classifier),
150 Arc::clone(&semaphore),
151 Arc::clone(&throughput),
152 );
153 let shutdown_rx_sched = shutdown_tx.subscribe();
154 let _scheduler_handle = tokio::spawn(async move {
155 sched.run(shutdown_rx_sched).await;
156 });
157
158 let _persist_handle = if config.state_file.is_some() {
160 let persist_config = Arc::clone(&config);
161 let persist_health = Arc::clone(&health);
162 let persist_proxy_mgr = Arc::clone(&proxy_manager);
163 let mut shutdown_rx_persist = shutdown_tx.subscribe();
164 let interval = config.state_save_interval;
165
166 Some(tokio::spawn(async move {
167 loop {
168 tokio::select! {
169 _ = shutdown_rx_persist.recv() => {
170 debug!("persist task received shutdown signal");
171 break;
172 }
173 _ = tokio::time::sleep(interval) => {
174 if let Some(ref path) = persist_config.state_file {
175 let health_stats = persist_health.get_all_stats();
176 let proxy_states = build_proxy_states(&persist_proxy_mgr);
177 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
178 warn!(error = %e, "failed to save state");
179 } else {
180 debug!("persisted state to disk");
181 }
182 }
183 }
184 }
185 }
186 }))
187 } else {
188 None
189 };
190
191 let _metrics_config = Arc::clone(&config);
193 let metrics_task_pool = Arc::clone(&task_pool);
194 let metrics_health = Arc::clone(&health);
195 let metrics_proxy_mgr = Arc::clone(&proxy_manager);
196 let metrics_throughput = Arc::clone(&throughput);
197 let metrics_semaphore = Arc::clone(&semaphore);
198 let mut shutdown_rx_metrics = shutdown_tx.subscribe();
199 let metrics_interval = config.metrics_log_interval;
200 let max_inflight = config.max_inflight;
201
202 let _metrics_handle = tokio::spawn(async move {
203 let prefix = metrics_name
204 .as_deref()
205 .map_or_else(String::new, |n| format!("[{n}] "));
206 loop {
207 tokio::select! {
208 _ = shutdown_rx_metrics.recv() => {
209 debug!("metrics task received shutdown signal");
210 break;
211 }
212 _ = tokio::time::sleep(metrics_interval) => {
213 let tp = metrics_throughput.throughput(Duration::from_secs(10));
214 let total_s = metrics_health.total_success();
215 let total_f = metrics_health.total_fail();
216 let total = total_s + total_f;
217 let success_pct = if total > 0 {
218 (total_s as f64 / total as f64) * 100.0
219 } else {
220 0.0
221 };
222 let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
223 let pending = metrics_task_pool.pending_count();
224 let delayed = metrics_task_pool.delayed_count();
225 let done = metrics_task_pool.completed_count();
226 let failed = metrics_task_pool.failed_count();
227 let requeued = metrics_task_pool.requeued_count();
228 let zero_available = metrics_task_pool.zero_available_count();
229 let skipped_no_permit = metrics_task_pool.skipped_no_permit_count();
230 let skipped_rate_limit = metrics_task_pool.skipped_rate_limit_count();
231 let skipped_cooldown = metrics_task_pool.skipped_cooldown_count();
232 let dispatches = metrics_task_pool.dispatch_count();
233 let inflight = max_inflight - metrics_semaphore.available_permits();
234 info!(
235 "{prefix}throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} delayed {} done {} failed requeued={} zero_avail={} dispatch={} skip(no_permit/rate/cooldown)={}/{}/{} | inflight={}",
236 tp,
237 success_pct,
238 healthy,
239 cooldown,
240 dead,
241 pending,
242 delayed,
243 done,
244 failed,
245 requeued,
246 zero_available,
247 dispatches,
248 skipped_no_permit,
249 skipped_rate_limit,
250 skipped_cooldown,
251 inflight
252 );
253 }
254 }
255 }
256 });
257
258 let effective_sources = Arc::new(effective_sources);
260 let refresh_sources = Arc::clone(&effective_sources);
261 let refresh_config = Arc::clone(&config);
262 let refresh_proxy_mgr = Arc::clone(&proxy_manager);
263 let mut shutdown_rx_refresh = shutdown_tx.subscribe();
264 let refresh_interval = config.source_refresh_interval;
265
266 let _refresh_handle = tokio::spawn(async move {
267 loop {
268 tokio::select! {
269 _ = shutdown_rx_refresh.recv() => {
270 debug!("refresh task received shutdown signal");
271 break;
272 }
273 _ = tokio::time::sleep(refresh_interval) => {
274 match refresh_proxy_mgr
275 .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
276 .await
277 {
278 Ok(count) => {
279 debug!(new_count = count, "refreshed proxy sources");
280 }
281 Err(e) => {
282 warn!(error = %e, "failed to refresh proxy sources");
283 }
284 }
285 }
286 }
287 }
288 });
289
290 Ok(ScatterProxy {
291 config,
292 task_pool,
293 health,
294 rate_limiter,
295 proxy_manager,
296 throughput,
297 semaphore,
298 shutdown_tx,
299 _scheduler_handle,
300 _persist_handle,
301 _metrics_handle,
302 _refresh_handle,
303 })
304 }
305
306 pub async fn submit(&self, request: reqwest::Request) -> TaskHandle {
313 self.task_pool.submit(request).await
314 }
315
316 pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
319 self.task_pool.try_submit(request)
320 }
321
322 pub async fn submit_batch(&self, requests: Vec<reqwest::Request>) -> Vec<TaskHandle> {
325 self.task_pool.submit_batch(requests).await
326 }
327
328 pub fn try_submit_batch(
331 &self,
332 requests: Vec<reqwest::Request>,
333 ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
334 self.task_pool.try_submit_batch(requests)
335 }
336
337 pub async fn submit_timeout(
344 &self,
345 request: reqwest::Request,
346 timeout: Duration,
347 ) -> Result<TaskHandle, ScatterProxyError> {
348 self.task_pool.submit_timeout(request, timeout).await
349 }
350
351 pub fn metrics(&self) -> PoolMetrics {
353 let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
354
355 let total_s = self.health.total_success();
356 let total_f = self.health.total_fail();
357 let total_requests = total_s + total_f;
358 let success_rate = if total_requests > 0 {
359 total_s as f64 / total_requests as f64
360 } else {
361 0.0
362 };
363
364 PoolMetrics {
365 total_proxies: total,
366 healthy_proxies: healthy,
367 cooldown_proxies: cooldown,
368 dead_proxies: dead,
369
370 pending_tasks: self.task_pool.pending_count(),
371 delayed_tasks: self.task_pool.delayed_count(),
372 completed_tasks: self.task_pool.completed_count(),
373 failed_tasks: self.task_pool.failed_count(),
374
375 throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
376 throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
377 throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
378
379 success_rate_1m: success_rate,
380 avg_latency_ms: self.health.avg_latency_ms(),
381
382 inflight: self.config.max_inflight - self.semaphore.available_permits(),
383 requeued_tasks: self.task_pool.requeued_count(),
384 zero_available_events: self.task_pool.zero_available_count(),
385 skipped_no_permit: self.task_pool.skipped_no_permit_count(),
386 skipped_rate_limit: self.task_pool.skipped_rate_limit_count(),
387 skipped_cooldown: self.task_pool.skipped_cooldown_count(),
388 dispatch_count: self.task_pool.dispatch_count(),
389 }
390 }
391
392 pub async fn shutdown(self) {
395 info!("initiating scatter-proxy shutdown");
396
397 let _ = self.shutdown_tx.send(());
399
400 if let Some(ref path) = self.config.state_file {
402 let health_stats = self.health.get_all_stats();
403 let proxy_states = build_proxy_states(&self.proxy_manager);
404 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
405 warn!(error = %e, "failed to save final state during shutdown");
406 } else {
407 debug!("saved final state to disk");
408 }
409 }
410
411 info!("scatter-proxy shutdown complete");
412 }
415}
416
417fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
419 let urls = proxy_manager.all_proxy_urls();
420 let mut states = HashMap::with_capacity(urls.len());
421 for url in urls {
422 let state = proxy_manager.get_state(&url);
423 let label = match state {
424 proxy::ProxyState::Active => "Active",
425 proxy::ProxyState::Dead => "Dead",
426 proxy::ProxyState::Unknown => "Unknown",
427 };
428 states.insert(url, label.to_string());
429 }
430 states
431}