Skip to main content

scatter_proxy/
lib.rs

1//! scatter-proxy — async request scheduler for unreliable SOCKS5 proxies.
2//!
3//! Multi-path race for maximum throughput.
4
5mod classifier;
6mod config;
7mod error;
8mod health;
9mod metrics;
10mod persist;
11mod proxy;
12mod rate_limit;
13mod scheduler;
14mod score;
15mod task;
16
17// Re-exports (public API surface)
18pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
19pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
20pub use error::ScatterProxyError;
21pub use metrics::{PoolMetrics, ProxyHostStats};
22pub use task::{ScatterResponse, TaskHandle};
23
24// Also re-export useful types for the classifier trait
25pub use http::HeaderMap;
26pub use http::StatusCode;
27
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::{broadcast, Semaphore};
32use tokio::task::JoinHandle;
33use tracing::{debug, info, warn};
34
35/// The main entry point. Manages proxy pool, scheduling, and background tasks.
36#[allow(dead_code)]
37pub struct ScatterProxy {
38    config: Arc<ScatterProxyConfig>,
39    task_pool: Arc<task::TaskPool>,
40    health: Arc<health::HealthTracker>,
41    rate_limiter: Arc<rate_limit::RateLimiter>,
42    proxy_manager: Arc<proxy::ProxyManager>,
43    throughput: Arc<metrics::ThroughputTracker>,
44    semaphore: Arc<Semaphore>,
45    shutdown_tx: broadcast::Sender<()>,
46    // Background task handles
47    _scheduler_handle: JoinHandle<()>,
48    _persist_handle: Option<JoinHandle<()>>,
49    _metrics_handle: JoinHandle<()>,
50    _refresh_handle: JoinHandle<()>,
51}
52
53impl ScatterProxy {
54    /// Create a new `ScatterProxy` from config and a body classifier.
55    ///
56    /// This is an async constructor that fetches proxies, restores persisted
57    /// state, and spawns all background tasks (scheduler, persistence, metrics
58    /// logging, source refresh).
59    pub async fn new(
60        config: ScatterProxyConfig,
61        classifier: impl BodyClassifier,
62    ) -> Result<Self, ScatterProxyError> {
63        let config = Arc::new(config);
64
65        // Build all Arc-wrapped components from config.
66        let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
67        let health = Arc::new(health::HealthTracker::new(config.health_window));
68        let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
69        let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
70        let throughput = Arc::new(metrics::ThroughputTracker::new());
71        let semaphore = Arc::new(Semaphore::new(config.max_inflight));
72        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
73
74        // Determine sources: use configured sources, or fall back to defaults.
75        let effective_sources: Vec<String> = if config.sources.is_empty() {
76            info!("no custom sources configured, using default free proxy sources");
77            DEFAULT_PROXY_SOURCES
78                .iter()
79                .map(|s| s.to_string())
80                .collect()
81        } else {
82            config.sources.clone()
83        };
84
85        // Fetch proxies from sources.
86        let proxy_count = proxy_manager
87            .fetch_and_add(&effective_sources, config.prefer_remote_dns)
88            .await?;
89
90        // Restore persisted state if configured.
91        if let Some(ref state_path) = config.state_file {
92            match persist::load_state(state_path).await {
93                Ok(Some(persisted)) => {
94                    for (proxy_url, persisted_proxy) in &persisted.proxies {
95                        // Restore per-host health data.
96                        for (host, stats) in &persisted_proxy.hosts {
97                            health.restore(proxy_url, host, stats);
98                        }
99                        // Restore proxy state.
100                        match persisted_proxy.state.as_str() {
101                            "Active" => {
102                                proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
103                            }
104                            "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
105                            _ => {} // Unknown or unrecognised — leave as default
106                        }
107                    }
108                    debug!(
109                        proxies = persisted.proxies.len(),
110                        "restored persisted state"
111                    );
112                }
113                Ok(None) => {
114                    debug!("no persisted state file found, starting fresh");
115                }
116                Err(e) => {
117                    warn!(error = %e, "failed to load persisted state, starting fresh");
118                }
119            }
120        }
121
122        info!(count = proxy_count, "loaded proxies from sources");
123
124        // Shutdown broadcast channel.
125        let (shutdown_tx, _) = broadcast::channel(1);
126
127        // ── Spawn background tasks ──────────────────────────────────────
128
129        // (a) Scheduler
130        let sched = scheduler::Scheduler::new(
131            Arc::clone(&config),
132            Arc::clone(&task_pool),
133            Arc::clone(&health),
134            Arc::clone(&rate_limiter),
135            Arc::clone(&proxy_manager),
136            Arc::clone(&classifier),
137            Arc::clone(&semaphore),
138            Arc::clone(&throughput),
139        );
140        let shutdown_rx_sched = shutdown_tx.subscribe();
141        let _scheduler_handle = tokio::spawn(async move {
142            sched.run(shutdown_rx_sched).await;
143        });
144
145        // (b) State persistence (only if state_file is configured)
146        let _persist_handle = if config.state_file.is_some() {
147            let persist_config = Arc::clone(&config);
148            let persist_health = Arc::clone(&health);
149            let persist_proxy_mgr = Arc::clone(&proxy_manager);
150            let mut shutdown_rx_persist = shutdown_tx.subscribe();
151            let interval = config.state_save_interval;
152
153            Some(tokio::spawn(async move {
154                loop {
155                    tokio::select! {
156                        _ = shutdown_rx_persist.recv() => {
157                            debug!("persist task received shutdown signal");
158                            break;
159                        }
160                        _ = tokio::time::sleep(interval) => {
161                            if let Some(ref path) = persist_config.state_file {
162                                let health_stats = persist_health.get_all_stats();
163                                let proxy_states = build_proxy_states(&persist_proxy_mgr);
164                                if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
165                                    warn!(error = %e, "failed to save state");
166                                } else {
167                                    debug!("persisted state to disk");
168                                }
169                            }
170                        }
171                    }
172                }
173            }))
174        } else {
175            None
176        };
177
178        // (c) Metrics logging
179        let _metrics_config = Arc::clone(&config);
180        let metrics_task_pool = Arc::clone(&task_pool);
181        let metrics_health = Arc::clone(&health);
182        let metrics_proxy_mgr = Arc::clone(&proxy_manager);
183        let metrics_throughput = Arc::clone(&throughput);
184        let metrics_semaphore = Arc::clone(&semaphore);
185        let mut shutdown_rx_metrics = shutdown_tx.subscribe();
186        let metrics_interval = config.metrics_log_interval;
187        let max_inflight = config.max_inflight;
188
189        let _metrics_handle = tokio::spawn(async move {
190            loop {
191                tokio::select! {
192                    _ = shutdown_rx_metrics.recv() => {
193                        debug!("metrics task received shutdown signal");
194                        break;
195                    }
196                    _ = tokio::time::sleep(metrics_interval) => {
197                        let tp = metrics_throughput.throughput(Duration::from_secs(10));
198                        let total_s = metrics_health.total_success();
199                        let total_f = metrics_health.total_fail();
200                        let total = total_s + total_f;
201                        let success_pct = if total > 0 {
202                            (total_s as f64 / total as f64) * 100.0
203                        } else {
204                            0.0
205                        };
206                        let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
207                        let pending = metrics_task_pool.pending_count();
208                        let delayed = metrics_task_pool.delayed_count();
209                        let done = metrics_task_pool.completed_count();
210                        let failed = metrics_task_pool.failed_count();
211                        let requeued = metrics_task_pool.requeued_count();
212                        let zero_available = metrics_task_pool.zero_available_count();
213                        let skipped_no_permit = metrics_task_pool.skipped_no_permit_count();
214                        let skipped_rate_limit = metrics_task_pool.skipped_rate_limit_count();
215                        let skipped_cooldown = metrics_task_pool.skipped_cooldown_count();
216                        let dispatches = metrics_task_pool.dispatch_count();
217                        let inflight = max_inflight - metrics_semaphore.available_permits();
218                        info!(
219                            "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} delayed {} done {} failed requeued={} zero_avail={} dispatch={} skip(no_permit/rate/cooldown)={}/{}/{} | inflight={}",
220                            tp,
221                            success_pct,
222                            healthy,
223                            cooldown,
224                            dead,
225                            pending,
226                            delayed,
227                            done,
228                            failed,
229                            requeued,
230                            zero_available,
231                            dispatches,
232                            skipped_no_permit,
233                            skipped_rate_limit,
234                            skipped_cooldown,
235                            inflight
236                        );
237                    }
238                }
239            }
240        });
241
242        // (d) Source refresh
243        let effective_sources = Arc::new(effective_sources);
244        let refresh_sources = Arc::clone(&effective_sources);
245        let refresh_config = Arc::clone(&config);
246        let refresh_proxy_mgr = Arc::clone(&proxy_manager);
247        let mut shutdown_rx_refresh = shutdown_tx.subscribe();
248        let refresh_interval = config.source_refresh_interval;
249
250        let _refresh_handle = tokio::spawn(async move {
251            loop {
252                tokio::select! {
253                    _ = shutdown_rx_refresh.recv() => {
254                        debug!("refresh task received shutdown signal");
255                        break;
256                    }
257                    _ = tokio::time::sleep(refresh_interval) => {
258                        match refresh_proxy_mgr
259                            .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
260                            .await
261                        {
262                            Ok(count) => {
263                                debug!(new_count = count, "refreshed proxy sources");
264                            }
265                            Err(e) => {
266                                warn!(error = %e, "failed to refresh proxy sources");
267                            }
268                        }
269                    }
270                }
271            }
272        });
273
274        Ok(ScatterProxy {
275            config,
276            task_pool,
277            health,
278            rate_limiter,
279            proxy_manager,
280            throughput,
281            semaphore,
282            shutdown_tx,
283            _scheduler_handle,
284            _persist_handle,
285            _metrics_handle,
286            _refresh_handle,
287        })
288    }
289
290    /// Submit a single request for proxied execution.
291    ///
292    /// Blocks until the pool has capacity, then returns a [`TaskHandle`] whose
293    /// `.await` blocks until a successful proxied response is obtained.  The
294    /// scheduler retries internally forever — use [`TaskHandle::with_timeout`]
295    /// or wrap with `tokio::time::timeout` to impose a caller-side deadline.
296    pub async fn submit(&self, request: reqwest::Request) -> TaskHandle {
297        self.task_pool.submit(request).await
298    }
299
300    /// Non-blocking submit.  Returns `Err(PoolFull)` immediately when the pool
301    /// is at capacity instead of blocking.
302    pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
303        self.task_pool.try_submit(request)
304    }
305
306    /// Submit a batch of requests for proxied execution, blocking until the
307    /// pool has capacity for each request sequentially.
308    pub async fn submit_batch(&self, requests: Vec<reqwest::Request>) -> Vec<TaskHandle> {
309        self.task_pool.submit_batch(requests).await
310    }
311
312    /// Non-blocking atomic batch submit.  If the pool doesn't have enough
313    /// capacity for the entire batch, the whole batch is rejected.
314    pub fn try_submit_batch(
315        &self,
316        requests: Vec<reqwest::Request>,
317    ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
318        self.task_pool.try_submit_batch(requests)
319    }
320
321    /// Submit with a deadline on the *submission* step itself.
322    ///
323    /// Blocks up to `timeout` waiting for pool capacity.  Returns
324    /// `Err(Timeout)` if no slot opens in time.  Once a slot is obtained the
325    /// returned `TaskHandle` blocks indefinitely (retry forever) until a
326    /// response arrives — use [`TaskHandle::with_timeout`] to bound that step.
327    pub async fn submit_timeout(
328        &self,
329        request: reqwest::Request,
330        timeout: Duration,
331    ) -> Result<TaskHandle, ScatterProxyError> {
332        self.task_pool.submit_timeout(request, timeout).await
333    }
334
335    /// Build a [`PoolMetrics`] snapshot from all internal components.
336    pub fn metrics(&self) -> PoolMetrics {
337        let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
338
339        let total_s = self.health.total_success();
340        let total_f = self.health.total_fail();
341        let total_requests = total_s + total_f;
342        let success_rate = if total_requests > 0 {
343            total_s as f64 / total_requests as f64
344        } else {
345            0.0
346        };
347
348        PoolMetrics {
349            total_proxies: total,
350            healthy_proxies: healthy,
351            cooldown_proxies: cooldown,
352            dead_proxies: dead,
353
354            pending_tasks: self.task_pool.pending_count(),
355            delayed_tasks: self.task_pool.delayed_count(),
356            completed_tasks: self.task_pool.completed_count(),
357            failed_tasks: self.task_pool.failed_count(),
358
359            throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
360            throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
361            throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
362
363            success_rate_1m: success_rate,
364            avg_latency_ms: self.health.avg_latency_ms(),
365
366            inflight: self.config.max_inflight - self.semaphore.available_permits(),
367            requeued_tasks: self.task_pool.requeued_count(),
368            zero_available_events: self.task_pool.zero_available_count(),
369            skipped_no_permit: self.task_pool.skipped_no_permit_count(),
370            skipped_rate_limit: self.task_pool.skipped_rate_limit_count(),
371            skipped_cooldown: self.task_pool.skipped_cooldown_count(),
372            dispatch_count: self.task_pool.dispatch_count(),
373        }
374    }
375
376    /// Gracefully shut down the proxy: signal all background tasks, persist
377    /// final state, and drop resources.
378    pub async fn shutdown(self) {
379        info!("initiating scatter-proxy shutdown");
380
381        // Signal all background tasks to stop.
382        let _ = self.shutdown_tx.send(());
383
384        // Final state save if configured.
385        if let Some(ref path) = self.config.state_file {
386            let health_stats = self.health.get_all_stats();
387            let proxy_states = build_proxy_states(&self.proxy_manager);
388            if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
389                warn!(error = %e, "failed to save final state during shutdown");
390            } else {
391                debug!("saved final state to disk");
392            }
393        }
394
395        info!("scatter-proxy shutdown complete");
396        // JoinHandles are dropped here, which detaches the background tasks
397        // (they will exit via the shutdown signal).
398    }
399}
400
401/// Build a map of proxy URL → state string for persistence.
402fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
403    let urls = proxy_manager.all_proxy_urls();
404    let mut states = HashMap::with_capacity(urls.len());
405    for url in urls {
406        let state = proxy_manager.get_state(&url);
407        let label = match state {
408            proxy::ProxyState::Active => "Active",
409            proxy::ProxyState::Dead => "Dead",
410            proxy::ProxyState::Unknown => "Unknown",
411        };
412        states.insert(url, label.to_string());
413    }
414    states
415}