Skip to main content

scatter_proxy/
lib.rs

1//! scatter-proxy — async request scheduler for unreliable SOCKS5 proxies.
2//!
3//! Multi-path race for maximum throughput.
4
5mod classifier;
6mod config;
7mod error;
8mod health;
9mod metrics;
10mod persist;
11mod proxy;
12mod rate_limit;
13mod router;
14mod scheduler;
15mod score;
16mod task;
17
18// Re-exports (public API surface)
19pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use router::ScatterProxyRouter;
24pub use task::{ScatterResponse, TaskHandle};
25
26// Also re-export useful types for the classifier trait
27pub use http::HeaderMap;
28pub use http::StatusCode;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tokio::sync::{broadcast, Semaphore};
34use tokio::task::JoinHandle;
35use tracing::{debug, info, warn};
36
37/// The main entry point. Manages proxy pool, scheduling, and background tasks.
38#[allow(dead_code)]
39pub struct ScatterProxy {
40    config: Arc<ScatterProxyConfig>,
41    task_pool: Arc<task::TaskPool>,
42    health: Arc<health::HealthTracker>,
43    rate_limiter: Arc<rate_limit::RateLimiter>,
44    proxy_manager: Arc<proxy::ProxyManager>,
45    throughput: Arc<metrics::ThroughputTracker>,
46    semaphore: Arc<Semaphore>,
47    shutdown_tx: broadcast::Sender<()>,
48    // Background task handles
49    _scheduler_handle: JoinHandle<()>,
50    _persist_handle: Option<JoinHandle<()>>,
51    _metrics_handle: JoinHandle<()>,
52    _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56    /// Create a new `ScatterProxy` from config and a body classifier.
57    ///
58    /// This is an async constructor that fetches proxies, restores persisted
59    /// state, and spawns all background tasks (scheduler, persistence, metrics
60    /// logging, source refresh).
61    pub async fn new(
62        config: ScatterProxyConfig,
63        classifier: impl BodyClassifier,
64    ) -> Result<Self, ScatterProxyError> {
65        Self::new_arc(config, Arc::new(classifier)).await
66    }
67
68    /// Internal constructor that accepts a pre-built `Arc<dyn BodyClassifier>`.
69    ///
70    /// Used by [`ScatterProxyRouter`] to share a single classifier instance
71    /// across all per-host pools without requiring `Clone` on the classifier.
72    pub(crate) async fn new_arc(
73        config: ScatterProxyConfig,
74        classifier: Arc<dyn BodyClassifier>,
75    ) -> Result<Self, ScatterProxyError> {
76        let metrics_name = config.name.clone();
77        let config = Arc::new(config);
78
79        // Build all Arc-wrapped components from config.
80        let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
81        let health = Arc::new(health::HealthTracker::new(config.health_window));
82        let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
83        let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
84        let throughput = Arc::new(metrics::ThroughputTracker::new());
85        let semaphore = Arc::new(Semaphore::new(config.max_inflight));
86
87        // Determine sources: use configured sources, or fall back to defaults.
88        let effective_sources: Vec<String> = if config.sources.is_empty() {
89            info!("no custom sources configured, using default free proxy sources");
90            DEFAULT_PROXY_SOURCES
91                .iter()
92                .map(|s| s.to_string())
93                .collect()
94        } else {
95            config.sources.clone()
96        };
97
98        // Fetch proxies from sources.
99        let proxy_count = proxy_manager
100            .fetch_and_add(&effective_sources, config.prefer_remote_dns)
101            .await?;
102
103        // Restore persisted state if configured.
104        if let Some(ref state_path) = config.state_file {
105            match persist::load_state(state_path).await {
106                Ok(Some(persisted)) => {
107                    for (proxy_url, persisted_proxy) in &persisted.proxies {
108                        // Restore per-host health data.
109                        for (host, stats) in &persisted_proxy.hosts {
110                            health.restore(proxy_url, host, stats);
111                        }
112                        // Restore proxy state.
113                        match persisted_proxy.state.as_str() {
114                            "Active" => {
115                                proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
116                            }
117                            "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
118                            _ => {} // Unknown or unrecognised — leave as default
119                        }
120                    }
121                    debug!(
122                        proxies = persisted.proxies.len(),
123                        "restored persisted state"
124                    );
125                }
126                Ok(None) => {
127                    debug!("no persisted state file found, starting fresh");
128                }
129                Err(e) => {
130                    warn!(error = %e, "failed to load persisted state, starting fresh");
131                }
132            }
133        }
134
135        info!(count = proxy_count, "loaded proxies from sources");
136
137        // Shutdown broadcast channel.
138        let (shutdown_tx, _) = broadcast::channel(1);
139
140        // ── Spawn background tasks ──────────────────────────────────────
141
142        // (a) Scheduler
143        let sched = scheduler::Scheduler::new(
144            Arc::clone(&config),
145            Arc::clone(&task_pool),
146            Arc::clone(&health),
147            Arc::clone(&rate_limiter),
148            Arc::clone(&proxy_manager),
149            Arc::clone(&classifier),
150            Arc::clone(&semaphore),
151            Arc::clone(&throughput),
152        );
153        let shutdown_rx_sched = shutdown_tx.subscribe();
154        let _scheduler_handle = tokio::spawn(async move {
155            sched.run(shutdown_rx_sched).await;
156        });
157
158        // (b) State persistence (only if state_file is configured)
159        let _persist_handle = if config.state_file.is_some() {
160            let persist_config = Arc::clone(&config);
161            let persist_health = Arc::clone(&health);
162            let persist_proxy_mgr = Arc::clone(&proxy_manager);
163            let mut shutdown_rx_persist = shutdown_tx.subscribe();
164            let interval = config.state_save_interval;
165
166            Some(tokio::spawn(async move {
167                loop {
168                    tokio::select! {
169                        _ = shutdown_rx_persist.recv() => {
170                            debug!("persist task received shutdown signal");
171                            break;
172                        }
173                        _ = tokio::time::sleep(interval) => {
174                            if let Some(ref path) = persist_config.state_file {
175                                let health_stats = persist_health.get_all_stats();
176                                let proxy_states = build_proxy_states(&persist_proxy_mgr);
177                                if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
178                                    warn!(error = %e, "failed to save state");
179                                } else {
180                                    debug!("persisted state to disk");
181                                }
182                            }
183                        }
184                    }
185                }
186            }))
187        } else {
188            None
189        };
190
191        // (c) Metrics logging
192        let _metrics_config = Arc::clone(&config);
193        let metrics_task_pool = Arc::clone(&task_pool);
194        let metrics_health = Arc::clone(&health);
195        let metrics_proxy_mgr = Arc::clone(&proxy_manager);
196        let metrics_throughput = Arc::clone(&throughput);
197        let metrics_semaphore = Arc::clone(&semaphore);
198        let mut shutdown_rx_metrics = shutdown_tx.subscribe();
199        let metrics_interval = config.metrics_log_interval;
200        let max_inflight = config.max_inflight;
201
202        let _metrics_handle = tokio::spawn(async move {
203            let prefix = metrics_name
204                .as_deref()
205                .map_or_else(String::new, |n| format!("[{n}] "));
206            loop {
207                tokio::select! {
208                    _ = shutdown_rx_metrics.recv() => {
209                        debug!("metrics task received shutdown signal");
210                        break;
211                    }
212                    _ = tokio::time::sleep(metrics_interval) => {
213                        let tp = metrics_throughput.throughput(Duration::from_secs(10));
214                        let total_s = metrics_health.total_success();
215                        let total_f = metrics_health.total_fail();
216                        let total = total_s + total_f;
217                        let success_pct = if total > 0 {
218                            (total_s as f64 / total as f64) * 100.0
219                        } else {
220                            0.0
221                        };
222                        let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
223                        let pending = metrics_task_pool.pending_count();
224                        let delayed = metrics_task_pool.delayed_count();
225                        let done = metrics_task_pool.completed_count();
226                        let failed = metrics_task_pool.failed_count();
227                        let requeued = metrics_task_pool.requeued_count();
228                        let zero_available = metrics_task_pool.zero_available_count();
229                        let skipped_no_permit = metrics_task_pool.skipped_no_permit_count();
230                        let skipped_rate_limit = metrics_task_pool.skipped_rate_limit_count();
231                        let skipped_cooldown = metrics_task_pool.skipped_cooldown_count();
232                        let dispatches = metrics_task_pool.dispatch_count();
233                        let inflight = max_inflight - metrics_semaphore.available_permits();
234                        info!(
235                            "{prefix}throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} delayed {} done {} failed requeued={} zero_avail={} dispatch={} skip(no_permit/rate/cooldown)={}/{}/{} | inflight={}",
236                            tp,
237                            success_pct,
238                            healthy,
239                            cooldown,
240                            dead,
241                            pending,
242                            delayed,
243                            done,
244                            failed,
245                            requeued,
246                            zero_available,
247                            dispatches,
248                            skipped_no_permit,
249                            skipped_rate_limit,
250                            skipped_cooldown,
251                            inflight
252                        );
253                    }
254                }
255            }
256        });
257
258        // (d) Source refresh
259        let effective_sources = Arc::new(effective_sources);
260        let refresh_sources = Arc::clone(&effective_sources);
261        let refresh_config = Arc::clone(&config);
262        let refresh_proxy_mgr = Arc::clone(&proxy_manager);
263        let mut shutdown_rx_refresh = shutdown_tx.subscribe();
264        let refresh_interval = config.source_refresh_interval;
265
266        let _refresh_handle = tokio::spawn(async move {
267            loop {
268                tokio::select! {
269                    _ = shutdown_rx_refresh.recv() => {
270                        debug!("refresh task received shutdown signal");
271                        break;
272                    }
273                    _ = tokio::time::sleep(refresh_interval) => {
274                        match refresh_proxy_mgr
275                            .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
276                            .await
277                        {
278                            Ok(count) => {
279                                debug!(new_count = count, "refreshed proxy sources");
280                            }
281                            Err(e) => {
282                                warn!(error = %e, "failed to refresh proxy sources");
283                            }
284                        }
285                    }
286                }
287            }
288        });
289
290        Ok(ScatterProxy {
291            config,
292            task_pool,
293            health,
294            rate_limiter,
295            proxy_manager,
296            throughput,
297            semaphore,
298            shutdown_tx,
299            _scheduler_handle,
300            _persist_handle,
301            _metrics_handle,
302            _refresh_handle,
303        })
304    }
305
306    /// Submit a single request for proxied execution.
307    ///
308    /// Blocks until the pool has capacity, then returns a [`TaskHandle`] whose
309    /// `.await` blocks until a successful proxied response is obtained.  The
310    /// scheduler retries internally forever — use [`TaskHandle::with_timeout`]
311    /// or wrap with `tokio::time::timeout` to impose a caller-side deadline.
312    pub async fn submit(&self, request: reqwest::Request) -> TaskHandle {
313        self.task_pool.submit(request).await
314    }
315
316    /// Non-blocking submit.  Returns `Err(PoolFull)` immediately when the pool
317    /// is at capacity instead of blocking.
318    pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
319        self.task_pool.try_submit(request)
320    }
321
322    /// Submit a batch of requests for proxied execution, blocking until the
323    /// pool has capacity for each request sequentially.
324    pub async fn submit_batch(&self, requests: Vec<reqwest::Request>) -> Vec<TaskHandle> {
325        self.task_pool.submit_batch(requests).await
326    }
327
328    /// Non-blocking atomic batch submit.  If the pool doesn't have enough
329    /// capacity for the entire batch, the whole batch is rejected.
330    pub fn try_submit_batch(
331        &self,
332        requests: Vec<reqwest::Request>,
333    ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
334        self.task_pool.try_submit_batch(requests)
335    }
336
337    /// Submit with a deadline on the *submission* step itself.
338    ///
339    /// Blocks up to `timeout` waiting for pool capacity.  Returns
340    /// `Err(Timeout)` if no slot opens in time.  Once a slot is obtained the
341    /// returned `TaskHandle` blocks indefinitely (retry forever) until a
342    /// response arrives — use [`TaskHandle::with_timeout`] to bound that step.
343    pub async fn submit_timeout(
344        &self,
345        request: reqwest::Request,
346        timeout: Duration,
347    ) -> Result<TaskHandle, ScatterProxyError> {
348        self.task_pool.submit_timeout(request, timeout).await
349    }
350
351    /// Build a [`PoolMetrics`] snapshot from all internal components.
352    pub fn metrics(&self) -> PoolMetrics {
353        let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
354
355        let total_s = self.health.total_success();
356        let total_f = self.health.total_fail();
357        let total_requests = total_s + total_f;
358        let success_rate = if total_requests > 0 {
359            total_s as f64 / total_requests as f64
360        } else {
361            0.0
362        };
363
364        PoolMetrics {
365            total_proxies: total,
366            healthy_proxies: healthy,
367            cooldown_proxies: cooldown,
368            dead_proxies: dead,
369
370            pending_tasks: self.task_pool.pending_count(),
371            delayed_tasks: self.task_pool.delayed_count(),
372            completed_tasks: self.task_pool.completed_count(),
373            failed_tasks: self.task_pool.failed_count(),
374
375            throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
376            throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
377            throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
378
379            success_rate_1m: success_rate,
380            avg_latency_ms: self.health.avg_latency_ms(),
381
382            inflight: self.config.max_inflight - self.semaphore.available_permits(),
383            requeued_tasks: self.task_pool.requeued_count(),
384            zero_available_events: self.task_pool.zero_available_count(),
385            skipped_no_permit: self.task_pool.skipped_no_permit_count(),
386            skipped_rate_limit: self.task_pool.skipped_rate_limit_count(),
387            skipped_cooldown: self.task_pool.skipped_cooldown_count(),
388            dispatch_count: self.task_pool.dispatch_count(),
389        }
390    }
391
392    /// Gracefully shut down the proxy: signal all background tasks, persist
393    /// final state, and drop resources.
394    pub async fn shutdown(self) {
395        info!("initiating scatter-proxy shutdown");
396
397        // Signal all background tasks to stop.
398        let _ = self.shutdown_tx.send(());
399
400        // Final state save if configured.
401        if let Some(ref path) = self.config.state_file {
402            let health_stats = self.health.get_all_stats();
403            let proxy_states = build_proxy_states(&self.proxy_manager);
404            if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
405                warn!(error = %e, "failed to save final state during shutdown");
406            } else {
407                debug!("saved final state to disk");
408            }
409        }
410
411        info!("scatter-proxy shutdown complete");
412        // JoinHandles are dropped here, which detaches the background tasks
413        // (they will exit via the shutdown signal).
414    }
415}
416
417/// Build a map of proxy URL → state string for persistence.
418fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
419    let urls = proxy_manager.all_proxy_urls();
420    let mut states = HashMap::with_capacity(urls.len());
421    for url in urls {
422        let state = proxy_manager.get_state(&url);
423        let label = match state {
424            proxy::ProxyState::Active => "Active",
425            proxy::ProxyState::Dead => "Dead",
426            proxy::ProxyState::Unknown => "Unknown",
427        };
428        states.insert(url, label.to_string());
429    }
430    states
431}