1mod classifier;
6mod config;
7mod error;
8mod health;
9mod metrics;
10mod persist;
11mod proxy;
12mod rate_limit;
13mod scheduler;
14mod score;
15mod task;
16
17pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
19pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
20pub use error::ScatterProxyError;
21pub use metrics::{PoolMetrics, ProxyHostStats};
22pub use task::{ScatterResponse, TaskHandle};
23
24pub use http::HeaderMap;
26pub use http::StatusCode;
27
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::{broadcast, Semaphore};
32use tokio::task::JoinHandle;
33use tracing::{debug, info, warn};
34
35#[allow(dead_code)]
37pub struct ScatterProxy {
38 config: Arc<ScatterProxyConfig>,
39 task_pool: Arc<task::TaskPool>,
40 health: Arc<health::HealthTracker>,
41 rate_limiter: Arc<rate_limit::RateLimiter>,
42 proxy_manager: Arc<proxy::ProxyManager>,
43 throughput: Arc<metrics::ThroughputTracker>,
44 semaphore: Arc<Semaphore>,
45 shutdown_tx: broadcast::Sender<()>,
46 _scheduler_handle: JoinHandle<()>,
48 _persist_handle: Option<JoinHandle<()>>,
49 _metrics_handle: JoinHandle<()>,
50 _refresh_handle: JoinHandle<()>,
51}
52
53impl ScatterProxy {
54 pub async fn new(
60 config: ScatterProxyConfig,
61 classifier: impl BodyClassifier,
62 ) -> Result<Self, ScatterProxyError> {
63 let config = Arc::new(config);
64
65 let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
67 let health = Arc::new(health::HealthTracker::new(config.health_window));
68 let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
69 let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
70 let throughput = Arc::new(metrics::ThroughputTracker::new());
71 let semaphore = Arc::new(Semaphore::new(config.max_inflight));
72 let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
73
74 let effective_sources: Vec<String> = if config.sources.is_empty() {
76 info!("no custom sources configured, using default free proxy sources");
77 DEFAULT_PROXY_SOURCES
78 .iter()
79 .map(|s| s.to_string())
80 .collect()
81 } else {
82 config.sources.clone()
83 };
84
85 let proxy_count = proxy_manager
87 .fetch_and_add(&effective_sources, config.prefer_remote_dns)
88 .await?;
89
90 if let Some(ref state_path) = config.state_file {
92 match persist::load_state(state_path).await {
93 Ok(Some(persisted)) => {
94 for (proxy_url, persisted_proxy) in &persisted.proxies {
95 for (host, stats) in &persisted_proxy.hosts {
97 health.restore(proxy_url, host, stats);
98 }
99 match persisted_proxy.state.as_str() {
101 "Active" => {
102 proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
103 }
104 "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
105 _ => {} }
107 }
108 debug!(
109 proxies = persisted.proxies.len(),
110 "restored persisted state"
111 );
112 }
113 Ok(None) => {
114 debug!("no persisted state file found, starting fresh");
115 }
116 Err(e) => {
117 warn!(error = %e, "failed to load persisted state, starting fresh");
118 }
119 }
120 }
121
122 info!(count = proxy_count, "loaded proxies from sources");
123
124 let (shutdown_tx, _) = broadcast::channel(1);
126
127 let sched = scheduler::Scheduler::new(
131 Arc::clone(&config),
132 Arc::clone(&task_pool),
133 Arc::clone(&health),
134 Arc::clone(&rate_limiter),
135 Arc::clone(&proxy_manager),
136 Arc::clone(&classifier),
137 Arc::clone(&semaphore),
138 Arc::clone(&throughput),
139 );
140 let shutdown_rx_sched = shutdown_tx.subscribe();
141 let _scheduler_handle = tokio::spawn(async move {
142 sched.run(shutdown_rx_sched).await;
143 });
144
145 let _persist_handle = if config.state_file.is_some() {
147 let persist_config = Arc::clone(&config);
148 let persist_health = Arc::clone(&health);
149 let persist_proxy_mgr = Arc::clone(&proxy_manager);
150 let mut shutdown_rx_persist = shutdown_tx.subscribe();
151 let interval = config.state_save_interval;
152
153 Some(tokio::spawn(async move {
154 loop {
155 tokio::select! {
156 _ = shutdown_rx_persist.recv() => {
157 debug!("persist task received shutdown signal");
158 break;
159 }
160 _ = tokio::time::sleep(interval) => {
161 if let Some(ref path) = persist_config.state_file {
162 let health_stats = persist_health.get_all_stats();
163 let proxy_states = build_proxy_states(&persist_proxy_mgr);
164 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
165 warn!(error = %e, "failed to save state");
166 } else {
167 debug!("persisted state to disk");
168 }
169 }
170 }
171 }
172 }
173 }))
174 } else {
175 None
176 };
177
178 let _metrics_config = Arc::clone(&config);
180 let metrics_task_pool = Arc::clone(&task_pool);
181 let metrics_health = Arc::clone(&health);
182 let metrics_proxy_mgr = Arc::clone(&proxy_manager);
183 let metrics_throughput = Arc::clone(&throughput);
184 let metrics_semaphore = Arc::clone(&semaphore);
185 let mut shutdown_rx_metrics = shutdown_tx.subscribe();
186 let metrics_interval = config.metrics_log_interval;
187 let max_inflight = config.max_inflight;
188
189 let _metrics_handle = tokio::spawn(async move {
190 loop {
191 tokio::select! {
192 _ = shutdown_rx_metrics.recv() => {
193 debug!("metrics task received shutdown signal");
194 break;
195 }
196 _ = tokio::time::sleep(metrics_interval) => {
197 let tp = metrics_throughput.throughput(Duration::from_secs(10));
198 let total_s = metrics_health.total_success();
199 let total_f = metrics_health.total_fail();
200 let total = total_s + total_f;
201 let success_pct = if total > 0 {
202 (total_s as f64 / total as f64) * 100.0
203 } else {
204 0.0
205 };
206 let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
207 let pending = metrics_task_pool.pending_count();
208 let delayed = metrics_task_pool.delayed_count();
209 let done = metrics_task_pool.completed_count();
210 let failed = metrics_task_pool.failed_count();
211 let requeued = metrics_task_pool.requeued_count();
212 let zero_available = metrics_task_pool.zero_available_count();
213 let skipped_no_permit = metrics_task_pool.skipped_no_permit_count();
214 let skipped_rate_limit = metrics_task_pool.skipped_rate_limit_count();
215 let skipped_cooldown = metrics_task_pool.skipped_cooldown_count();
216 let dispatches = metrics_task_pool.dispatch_count();
217 let inflight = max_inflight - metrics_semaphore.available_permits();
218 info!(
219 "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} delayed {} done {} failed requeued={} zero_avail={} dispatch={} skip(no_permit/rate/cooldown)={}/{}/{} | inflight={}",
220 tp,
221 success_pct,
222 healthy,
223 cooldown,
224 dead,
225 pending,
226 delayed,
227 done,
228 failed,
229 requeued,
230 zero_available,
231 dispatches,
232 skipped_no_permit,
233 skipped_rate_limit,
234 skipped_cooldown,
235 inflight
236 );
237 }
238 }
239 }
240 });
241
242 let effective_sources = Arc::new(effective_sources);
244 let refresh_sources = Arc::clone(&effective_sources);
245 let refresh_config = Arc::clone(&config);
246 let refresh_proxy_mgr = Arc::clone(&proxy_manager);
247 let mut shutdown_rx_refresh = shutdown_tx.subscribe();
248 let refresh_interval = config.source_refresh_interval;
249
250 let _refresh_handle = tokio::spawn(async move {
251 loop {
252 tokio::select! {
253 _ = shutdown_rx_refresh.recv() => {
254 debug!("refresh task received shutdown signal");
255 break;
256 }
257 _ = tokio::time::sleep(refresh_interval) => {
258 match refresh_proxy_mgr
259 .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
260 .await
261 {
262 Ok(count) => {
263 debug!(new_count = count, "refreshed proxy sources");
264 }
265 Err(e) => {
266 warn!(error = %e, "failed to refresh proxy sources");
267 }
268 }
269 }
270 }
271 }
272 });
273
274 Ok(ScatterProxy {
275 config,
276 task_pool,
277 health,
278 rate_limiter,
279 proxy_manager,
280 throughput,
281 semaphore,
282 shutdown_tx,
283 _scheduler_handle,
284 _persist_handle,
285 _metrics_handle,
286 _refresh_handle,
287 })
288 }
289
290 pub async fn submit(&self, request: reqwest::Request) -> TaskHandle {
297 self.task_pool.submit(request).await
298 }
299
300 pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
303 self.task_pool.try_submit(request)
304 }
305
306 pub async fn submit_batch(&self, requests: Vec<reqwest::Request>) -> Vec<TaskHandle> {
309 self.task_pool.submit_batch(requests).await
310 }
311
312 pub fn try_submit_batch(
315 &self,
316 requests: Vec<reqwest::Request>,
317 ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
318 self.task_pool.try_submit_batch(requests)
319 }
320
321 pub async fn submit_timeout(
328 &self,
329 request: reqwest::Request,
330 timeout: Duration,
331 ) -> Result<TaskHandle, ScatterProxyError> {
332 self.task_pool.submit_timeout(request, timeout).await
333 }
334
335 pub fn metrics(&self) -> PoolMetrics {
337 let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
338
339 let total_s = self.health.total_success();
340 let total_f = self.health.total_fail();
341 let total_requests = total_s + total_f;
342 let success_rate = if total_requests > 0 {
343 total_s as f64 / total_requests as f64
344 } else {
345 0.0
346 };
347
348 PoolMetrics {
349 total_proxies: total,
350 healthy_proxies: healthy,
351 cooldown_proxies: cooldown,
352 dead_proxies: dead,
353
354 pending_tasks: self.task_pool.pending_count(),
355 delayed_tasks: self.task_pool.delayed_count(),
356 completed_tasks: self.task_pool.completed_count(),
357 failed_tasks: self.task_pool.failed_count(),
358
359 throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
360 throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
361 throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
362
363 success_rate_1m: success_rate,
364 avg_latency_ms: self.health.avg_latency_ms(),
365
366 inflight: self.config.max_inflight - self.semaphore.available_permits(),
367 requeued_tasks: self.task_pool.requeued_count(),
368 zero_available_events: self.task_pool.zero_available_count(),
369 skipped_no_permit: self.task_pool.skipped_no_permit_count(),
370 skipped_rate_limit: self.task_pool.skipped_rate_limit_count(),
371 skipped_cooldown: self.task_pool.skipped_cooldown_count(),
372 dispatch_count: self.task_pool.dispatch_count(),
373 }
374 }
375
376 pub async fn shutdown(self) {
379 info!("initiating scatter-proxy shutdown");
380
381 let _ = self.shutdown_tx.send(());
383
384 if let Some(ref path) = self.config.state_file {
386 let health_stats = self.health.get_all_stats();
387 let proxy_states = build_proxy_states(&self.proxy_manager);
388 if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
389 warn!(error = %e, "failed to save final state during shutdown");
390 } else {
391 debug!("saved final state to disk");
392 }
393 }
394
395 info!("scatter-proxy shutdown complete");
396 }
399}
400
401fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
403 let urls = proxy_manager.all_proxy_urls();
404 let mut states = HashMap::with_capacity(urls.len());
405 for url in urls {
406 let state = proxy_manager.get_state(&url);
407 let label = match state {
408 proxy::ProxyState::Active => "Active",
409 proxy::ProxyState::Dead => "Dead",
410 proxy::ProxyState::Unknown => "Unknown",
411 };
412 states.insert(url, label.to_string());
413 }
414 states
415}