Skip to main content

scatter_proxy/
lib.rs

1//! scatter-proxy — async request scheduler for unreliable SOCKS5 proxies.
2//!
3//! Multi-path race for maximum throughput.
4
5mod circuit_breaker;
6mod classifier;
7mod config;
8mod error;
9mod health;
10mod metrics;
11mod persist;
12mod proxy;
13mod rate_limit;
14mod scheduler;
15mod score;
16mod task;
17
18// Re-exports (public API surface)
19pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use task::{ScatterResponse, TaskHandle};
24
25// Also re-export useful types for the classifier trait
26pub use http::HeaderMap;
27pub use http::StatusCode;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::sync::{broadcast, Semaphore};
33use tokio::task::JoinHandle;
34use tracing::{debug, info, warn};
35
36/// The main entry point. Manages proxy pool, scheduling, and background tasks.
37#[allow(dead_code)]
38pub struct ScatterProxy {
39    config: Arc<ScatterProxyConfig>,
40    task_pool: Arc<task::TaskPool>,
41    health: Arc<health::HealthTracker>,
42    rate_limiter: Arc<rate_limit::RateLimiter>,
43    circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
44    proxy_manager: Arc<proxy::ProxyManager>,
45    throughput: Arc<metrics::ThroughputTracker>,
46    semaphore: Arc<Semaphore>,
47    shutdown_tx: broadcast::Sender<()>,
48    // Background task handles
49    _scheduler_handle: JoinHandle<()>,
50    _persist_handle: Option<JoinHandle<()>>,
51    _metrics_handle: JoinHandle<()>,
52    _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56    /// Create a new `ScatterProxy` from config and a body classifier.
57    ///
58    /// This is an async constructor that fetches proxies, restores persisted
59    /// state, and spawns all background tasks (scheduler, persistence, metrics
60    /// logging, source refresh).
61    pub async fn new(
62        config: ScatterProxyConfig,
63        classifier: impl BodyClassifier,
64    ) -> Result<Self, ScatterProxyError> {
65        let config = Arc::new(config);
66
67        // Build all Arc-wrapped components from config.
68        let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
69        let health = Arc::new(health::HealthTracker::new(config.health_window));
70        let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
71        let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
72            config.circuit_breaker_threshold,
73            config.circuit_breaker_probe_interval,
74        ));
75        let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
76        let throughput = Arc::new(metrics::ThroughputTracker::new());
77        let semaphore = Arc::new(Semaphore::new(config.max_inflight));
78        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
79
80        // Determine sources: use configured sources, or fall back to defaults.
81        let effective_sources: Vec<String> = if config.sources.is_empty() {
82            info!("no custom sources configured, using default free proxy sources");
83            DEFAULT_PROXY_SOURCES
84                .iter()
85                .map(|s| s.to_string())
86                .collect()
87        } else {
88            config.sources.clone()
89        };
90
91        // Fetch proxies from sources.
92        let proxy_count = proxy_manager
93            .fetch_and_add(&effective_sources, config.prefer_remote_dns)
94            .await?;
95
96        // Restore persisted state if configured.
97        if let Some(ref state_path) = config.state_file {
98            match persist::load_state(state_path).await {
99                Ok(Some(persisted)) => {
100                    for (proxy_url, persisted_proxy) in &persisted.proxies {
101                        // Restore per-host health data.
102                        for (host, stats) in &persisted_proxy.hosts {
103                            health.restore(proxy_url, host, stats);
104                        }
105                        // Restore proxy state.
106                        match persisted_proxy.state.as_str() {
107                            "Active" => {
108                                proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
109                            }
110                            "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
111                            _ => {} // Unknown or unrecognised — leave as default
112                        }
113                    }
114                    debug!(
115                        proxies = persisted.proxies.len(),
116                        "restored persisted state"
117                    );
118                }
119                Ok(None) => {
120                    debug!("no persisted state file found, starting fresh");
121                }
122                Err(e) => {
123                    warn!(error = %e, "failed to load persisted state, starting fresh");
124                }
125            }
126        }
127
128        info!(count = proxy_count, "loaded proxies from sources");
129
130        // Shutdown broadcast channel.
131        let (shutdown_tx, _) = broadcast::channel(1);
132
133        // ── Spawn background tasks ──────────────────────────────────────
134
135        // (a) Scheduler
136        let sched = scheduler::Scheduler::new(
137            Arc::clone(&config),
138            Arc::clone(&task_pool),
139            Arc::clone(&health),
140            Arc::clone(&rate_limiter),
141            Arc::clone(&circuit_breakers),
142            Arc::clone(&proxy_manager),
143            Arc::clone(&classifier),
144            Arc::clone(&semaphore),
145            Arc::clone(&throughput),
146        );
147        let shutdown_rx_sched = shutdown_tx.subscribe();
148        let _scheduler_handle = tokio::spawn(async move {
149            sched.run(shutdown_rx_sched).await;
150        });
151
152        // (b) State persistence (only if state_file is configured)
153        let _persist_handle = if config.state_file.is_some() {
154            let persist_config = Arc::clone(&config);
155            let persist_health = Arc::clone(&health);
156            let persist_proxy_mgr = Arc::clone(&proxy_manager);
157            let mut shutdown_rx_persist = shutdown_tx.subscribe();
158            let interval = config.state_save_interval;
159
160            Some(tokio::spawn(async move {
161                loop {
162                    tokio::select! {
163                        _ = shutdown_rx_persist.recv() => {
164                            debug!("persist task received shutdown signal");
165                            break;
166                        }
167                        _ = tokio::time::sleep(interval) => {
168                            if let Some(ref path) = persist_config.state_file {
169                                let health_stats = persist_health.get_all_stats();
170                                let proxy_states = build_proxy_states(&persist_proxy_mgr);
171                                if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
172                                    warn!(error = %e, "failed to save state");
173                                } else {
174                                    debug!("persisted state to disk");
175                                }
176                            }
177                        }
178                    }
179                }
180            }))
181        } else {
182            None
183        };
184
185        // (c) Metrics logging
186        let _metrics_config = Arc::clone(&config);
187        let metrics_task_pool = Arc::clone(&task_pool);
188        let metrics_health = Arc::clone(&health);
189        let metrics_proxy_mgr = Arc::clone(&proxy_manager);
190        let metrics_throughput = Arc::clone(&throughput);
191        let metrics_semaphore = Arc::clone(&semaphore);
192        let metrics_cb = Arc::clone(&circuit_breakers);
193        let mut shutdown_rx_metrics = shutdown_tx.subscribe();
194        let metrics_interval = config.metrics_log_interval;
195        let max_inflight = config.max_inflight;
196
197        let _metrics_handle = tokio::spawn(async move {
198            loop {
199                tokio::select! {
200                    _ = shutdown_rx_metrics.recv() => {
201                        debug!("metrics task received shutdown signal");
202                        break;
203                    }
204                    _ = tokio::time::sleep(metrics_interval) => {
205                        let tp = metrics_throughput.throughput(Duration::from_secs(10));
206                        let total_s = metrics_health.total_success();
207                        let total_f = metrics_health.total_fail();
208                        let total = total_s + total_f;
209                        let success_pct = if total > 0 {
210                            (total_s as f64 / total as f64) * 100.0
211                        } else {
212                            0.0
213                        };
214                        let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
215                        let pending = metrics_task_pool.pending_count();
216                        let done = metrics_task_pool.completed_count();
217                        let failed = metrics_task_pool.failed_count();
218                        let inflight = max_inflight - metrics_semaphore.available_permits();
219                        let breakers = metrics_cb.get_all();
220                        let open_breakers: Vec<&String> = breakers.iter()
221                            .filter(|(_, &open)| open)
222                            .map(|(h, _)| h)
223                            .collect();
224
225                        info!(
226                            "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
227                            tp,
228                            success_pct,
229                            healthy,
230                            cooldown,
231                            dead,
232                            pending,
233                            done,
234                            failed,
235                            inflight,
236                            if open_breakers.is_empty() {
237                                "none".to_string()
238                            } else {
239                                format!("{:?}", open_breakers)
240                            }
241                        );
242                    }
243                }
244            }
245        });
246
247        // (d) Source refresh
248        let effective_sources = Arc::new(effective_sources);
249        let refresh_sources = Arc::clone(&effective_sources);
250        let refresh_config = Arc::clone(&config);
251        let refresh_proxy_mgr = Arc::clone(&proxy_manager);
252        let mut shutdown_rx_refresh = shutdown_tx.subscribe();
253        let refresh_interval = config.source_refresh_interval;
254
255        let _refresh_handle = tokio::spawn(async move {
256            loop {
257                tokio::select! {
258                    _ = shutdown_rx_refresh.recv() => {
259                        debug!("refresh task received shutdown signal");
260                        break;
261                    }
262                    _ = tokio::time::sleep(refresh_interval) => {
263                        match refresh_proxy_mgr
264                            .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
265                            .await
266                        {
267                            Ok(count) => {
268                                debug!(new_count = count, "refreshed proxy sources");
269                            }
270                            Err(e) => {
271                                warn!(error = %e, "failed to refresh proxy sources");
272                            }
273                        }
274                    }
275                }
276            }
277        });
278
279        Ok(ScatterProxy {
280            config,
281            task_pool,
282            health,
283            rate_limiter,
284            circuit_breakers,
285            proxy_manager,
286            throughput,
287            semaphore,
288            shutdown_tx,
289            _scheduler_handle,
290            _persist_handle,
291            _metrics_handle,
292            _refresh_handle,
293        })
294    }
295
296    /// Submit a single request for proxied execution.
297    ///
298    /// Blocks until the pool has capacity, then returns a [`TaskHandle`] whose
299    /// `.await` blocks until a successful proxied response is obtained.  The
300    /// scheduler retries internally forever — use [`TaskHandle::with_timeout`]
301    /// or wrap with `tokio::time::timeout` to impose a caller-side deadline.
302    pub async fn submit(&self, request: reqwest::Request) -> TaskHandle {
303        self.task_pool.submit(request).await
304    }
305
306    /// Non-blocking submit.  Returns `Err(PoolFull)` immediately when the pool
307    /// is at capacity instead of blocking.
308    pub fn try_submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
309        self.task_pool.try_submit(request)
310    }
311
312    /// Submit a batch of requests for proxied execution, blocking until the
313    /// pool has capacity for each request sequentially.
314    pub async fn submit_batch(&self, requests: Vec<reqwest::Request>) -> Vec<TaskHandle> {
315        self.task_pool.submit_batch(requests).await
316    }
317
318    /// Non-blocking atomic batch submit.  If the pool doesn't have enough
319    /// capacity for the entire batch, the whole batch is rejected.
320    pub fn try_submit_batch(
321        &self,
322        requests: Vec<reqwest::Request>,
323    ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
324        self.task_pool.try_submit_batch(requests)
325    }
326
327    /// Submit with a deadline on the *submission* step itself.
328    ///
329    /// Blocks up to `timeout` waiting for pool capacity.  Returns
330    /// `Err(Timeout)` if no slot opens in time.  Once a slot is obtained the
331    /// returned `TaskHandle` blocks indefinitely (retry forever) until a
332    /// response arrives — use [`TaskHandle::with_timeout`] to bound that step.
333    pub async fn submit_timeout(
334        &self,
335        request: reqwest::Request,
336        timeout: Duration,
337    ) -> Result<TaskHandle, ScatterProxyError> {
338        self.task_pool.submit_timeout(request, timeout).await
339    }
340
341    /// Build a [`PoolMetrics`] snapshot from all internal components.
342    pub fn metrics(&self) -> PoolMetrics {
343        let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
344
345        let total_s = self.health.total_success();
346        let total_f = self.health.total_fail();
347        let total_requests = total_s + total_f;
348        let success_rate = if total_requests > 0 {
349            total_s as f64 / total_requests as f64
350        } else {
351            0.0
352        };
353
354        PoolMetrics {
355            total_proxies: total,
356            healthy_proxies: healthy,
357            cooldown_proxies: cooldown,
358            dead_proxies: dead,
359
360            pending_tasks: self.task_pool.pending_count(),
361            completed_tasks: self.task_pool.completed_count(),
362            failed_tasks: self.task_pool.failed_count(),
363
364            throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
365            throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
366            throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
367
368            success_rate_1m: success_rate,
369            avg_latency_ms: self.health.avg_latency_ms(),
370
371            inflight: self.config.max_inflight - self.semaphore.available_permits(),
372            circuit_breakers: self.circuit_breakers.get_all(),
373        }
374    }
375
376    /// Gracefully shut down the proxy: signal all background tasks, persist
377    /// final state, and drop resources.
378    pub async fn shutdown(self) {
379        info!("initiating scatter-proxy shutdown");
380
381        // Signal all background tasks to stop.
382        let _ = self.shutdown_tx.send(());
383
384        // Final state save if configured.
385        if let Some(ref path) = self.config.state_file {
386            let health_stats = self.health.get_all_stats();
387            let proxy_states = build_proxy_states(&self.proxy_manager);
388            if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
389                warn!(error = %e, "failed to save final state during shutdown");
390            } else {
391                debug!("saved final state to disk");
392            }
393        }
394
395        info!("scatter-proxy shutdown complete");
396        // JoinHandles are dropped here, which detaches the background tasks
397        // (they will exit via the shutdown signal).
398    }
399}
400
401/// Build a map of proxy URL → state string for persistence.
402fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
403    let urls = proxy_manager.all_proxy_urls();
404    let mut states = HashMap::with_capacity(urls.len());
405    for url in urls {
406        let state = proxy_manager.get_state(&url);
407        let label = match state {
408            proxy::ProxyState::Active => "Active",
409            proxy::ProxyState::Dead => "Dead",
410            proxy::ProxyState::Unknown => "Unknown",
411        };
412        states.insert(url, label.to_string());
413    }
414    states
415}