Skip to main content

scatter_proxy/
lib.rs

1//! scatter-proxy — async request scheduler for unreliable SOCKS5 proxies.
2//!
3//! Multi-path race for maximum throughput.
4
5mod circuit_breaker;
6mod classifier;
7mod config;
8mod error;
9mod health;
10mod metrics;
11mod persist;
12mod proxy;
13mod rate_limit;
14mod scheduler;
15mod score;
16mod task;
17
18// Re-exports (public API surface)
19pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig, DEFAULT_PROXY_SOURCES};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use task::{ScatterResponse, TaskHandle};
24
25// Also re-export useful types for the classifier trait
26pub use http::HeaderMap;
27pub use http::StatusCode;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::sync::{broadcast, Semaphore};
33use tokio::task::JoinHandle;
34use tracing::{debug, info, warn};
35
36/// The main entry point. Manages proxy pool, scheduling, and background tasks.
37#[allow(dead_code)]
38pub struct ScatterProxy {
39    config: Arc<ScatterProxyConfig>,
40    task_pool: Arc<task::TaskPool>,
41    health: Arc<health::HealthTracker>,
42    rate_limiter: Arc<rate_limit::RateLimiter>,
43    circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
44    proxy_manager: Arc<proxy::ProxyManager>,
45    throughput: Arc<metrics::ThroughputTracker>,
46    semaphore: Arc<Semaphore>,
47    shutdown_tx: broadcast::Sender<()>,
48    // Background task handles
49    _scheduler_handle: JoinHandle<()>,
50    _persist_handle: Option<JoinHandle<()>>,
51    _metrics_handle: JoinHandle<()>,
52    _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56    /// Create a new `ScatterProxy` from config and a body classifier.
57    ///
58    /// This is an async constructor that fetches proxies, restores persisted
59    /// state, and spawns all background tasks (scheduler, persistence, metrics
60    /// logging, source refresh).
61    pub async fn new(
62        config: ScatterProxyConfig,
63        classifier: impl BodyClassifier,
64    ) -> Result<Self, ScatterProxyError> {
65        let config = Arc::new(config);
66
67        // Build all Arc-wrapped components from config.
68        let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
69        let health = Arc::new(health::HealthTracker::new(config.health_window));
70        let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
71        let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
72            config.circuit_breaker_threshold,
73            config.circuit_breaker_probe_interval,
74        ));
75        let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
76        let throughput = Arc::new(metrics::ThroughputTracker::new());
77        let semaphore = Arc::new(Semaphore::new(config.max_inflight));
78        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
79
80        // Determine sources: use configured sources, or fall back to defaults.
81        let effective_sources: Vec<String> = if config.sources.is_empty() {
82            info!("no custom sources configured, using default free proxy sources");
83            DEFAULT_PROXY_SOURCES
84                .iter()
85                .map(|s| s.to_string())
86                .collect()
87        } else {
88            config.sources.clone()
89        };
90
91        // Fetch proxies from sources.
92        let proxy_count = proxy_manager
93            .fetch_and_add(&effective_sources, config.prefer_remote_dns)
94            .await?;
95
96        // Restore persisted state if configured.
97        if let Some(ref state_path) = config.state_file {
98            match persist::load_state(state_path).await {
99                Ok(Some(persisted)) => {
100                    for (proxy_url, persisted_proxy) in &persisted.proxies {
101                        // Restore per-host health data.
102                        for (host, stats) in &persisted_proxy.hosts {
103                            health.restore(proxy_url, host, stats);
104                        }
105                        // Restore proxy state.
106                        match persisted_proxy.state.as_str() {
107                            "Active" => {
108                                proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
109                            }
110                            "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
111                            _ => {} // Unknown or unrecognised — leave as default
112                        }
113                    }
114                    debug!(
115                        proxies = persisted.proxies.len(),
116                        "restored persisted state"
117                    );
118                }
119                Ok(None) => {
120                    debug!("no persisted state file found, starting fresh");
121                }
122                Err(e) => {
123                    warn!(error = %e, "failed to load persisted state, starting fresh");
124                }
125            }
126        }
127
128        info!(count = proxy_count, "loaded proxies from sources");
129
130        // Shutdown broadcast channel.
131        let (shutdown_tx, _) = broadcast::channel(1);
132
133        // ── Spawn background tasks ──────────────────────────────────────
134
135        // (a) Scheduler
136        let sched = scheduler::Scheduler::new(
137            Arc::clone(&config),
138            Arc::clone(&task_pool),
139            Arc::clone(&health),
140            Arc::clone(&rate_limiter),
141            Arc::clone(&circuit_breakers),
142            Arc::clone(&proxy_manager),
143            Arc::clone(&classifier),
144            Arc::clone(&semaphore),
145            Arc::clone(&throughput),
146        );
147        let shutdown_rx_sched = shutdown_tx.subscribe();
148        let _scheduler_handle = tokio::spawn(async move {
149            sched.run(shutdown_rx_sched).await;
150        });
151
152        // (b) State persistence (only if state_file is configured)
153        let _persist_handle = if config.state_file.is_some() {
154            let persist_config = Arc::clone(&config);
155            let persist_health = Arc::clone(&health);
156            let persist_proxy_mgr = Arc::clone(&proxy_manager);
157            let mut shutdown_rx_persist = shutdown_tx.subscribe();
158            let interval = config.state_save_interval;
159
160            Some(tokio::spawn(async move {
161                loop {
162                    tokio::select! {
163                        _ = shutdown_rx_persist.recv() => {
164                            debug!("persist task received shutdown signal");
165                            break;
166                        }
167                        _ = tokio::time::sleep(interval) => {
168                            if let Some(ref path) = persist_config.state_file {
169                                let health_stats = persist_health.get_all_stats();
170                                let proxy_states = build_proxy_states(&persist_proxy_mgr);
171                                if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
172                                    warn!(error = %e, "failed to save state");
173                                } else {
174                                    debug!("persisted state to disk");
175                                }
176                            }
177                        }
178                    }
179                }
180            }))
181        } else {
182            None
183        };
184
185        // (c) Metrics logging
186        let _metrics_config = Arc::clone(&config);
187        let metrics_task_pool = Arc::clone(&task_pool);
188        let metrics_health = Arc::clone(&health);
189        let metrics_proxy_mgr = Arc::clone(&proxy_manager);
190        let metrics_throughput = Arc::clone(&throughput);
191        let metrics_semaphore = Arc::clone(&semaphore);
192        let metrics_cb = Arc::clone(&circuit_breakers);
193        let mut shutdown_rx_metrics = shutdown_tx.subscribe();
194        let metrics_interval = config.metrics_log_interval;
195        let max_inflight = config.max_inflight;
196
197        let _metrics_handle = tokio::spawn(async move {
198            loop {
199                tokio::select! {
200                    _ = shutdown_rx_metrics.recv() => {
201                        debug!("metrics task received shutdown signal");
202                        break;
203                    }
204                    _ = tokio::time::sleep(metrics_interval) => {
205                        let tp = metrics_throughput.throughput(Duration::from_secs(10));
206                        let total_s = metrics_health.total_success();
207                        let total_f = metrics_health.total_fail();
208                        let total = total_s + total_f;
209                        let success_pct = if total > 0 {
210                            (total_s as f64 / total as f64) * 100.0
211                        } else {
212                            0.0
213                        };
214                        let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
215                        let pending = metrics_task_pool.pending_count();
216                        let done = metrics_task_pool.completed_count();
217                        let failed = metrics_task_pool.failed_count();
218                        let inflight = max_inflight - metrics_semaphore.available_permits();
219                        let breakers = metrics_cb.get_all();
220                        let open_breakers: Vec<&String> = breakers.iter()
221                            .filter(|(_, &open)| open)
222                            .map(|(h, _)| h)
223                            .collect();
224
225                        info!(
226                            "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
227                            tp,
228                            success_pct,
229                            healthy,
230                            cooldown,
231                            dead,
232                            pending,
233                            done,
234                            failed,
235                            inflight,
236                            if open_breakers.is_empty() {
237                                "none".to_string()
238                            } else {
239                                format!("{:?}", open_breakers)
240                            }
241                        );
242                    }
243                }
244            }
245        });
246
247        // (d) Source refresh
248        let effective_sources = Arc::new(effective_sources);
249        let refresh_sources = Arc::clone(&effective_sources);
250        let refresh_config = Arc::clone(&config);
251        let refresh_proxy_mgr = Arc::clone(&proxy_manager);
252        let mut shutdown_rx_refresh = shutdown_tx.subscribe();
253        let refresh_interval = config.source_refresh_interval;
254
255        let _refresh_handle = tokio::spawn(async move {
256            loop {
257                tokio::select! {
258                    _ = shutdown_rx_refresh.recv() => {
259                        debug!("refresh task received shutdown signal");
260                        break;
261                    }
262                    _ = tokio::time::sleep(refresh_interval) => {
263                        match refresh_proxy_mgr
264                            .fetch_and_add(&refresh_sources, refresh_config.prefer_remote_dns)
265                            .await
266                        {
267                            Ok(count) => {
268                                debug!(new_count = count, "refreshed proxy sources");
269                            }
270                            Err(e) => {
271                                warn!(error = %e, "failed to refresh proxy sources");
272                            }
273                        }
274                    }
275                }
276            }
277        });
278
279        Ok(ScatterProxy {
280            config,
281            task_pool,
282            health,
283            rate_limiter,
284            circuit_breakers,
285            proxy_manager,
286            throughput,
287            semaphore,
288            shutdown_tx,
289            _scheduler_handle,
290            _persist_handle,
291            _metrics_handle,
292            _refresh_handle,
293        })
294    }
295
296    /// Submit a single request for proxied execution.
297    ///
298    /// Returns a [`TaskHandle`] that implements `Future` and resolves to the
299    /// proxied response (or an error).
300    pub fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
301        self.task_pool
302            .submit(request, self.config.max_attempts, self.config.task_timeout)
303    }
304
305    /// Submit a batch of requests for proxied execution.
306    ///
307    /// All requests are added atomically — if the pool doesn't have enough
308    /// capacity the entire batch is rejected.
309    pub fn submit_batch(
310        &self,
311        requests: Vec<reqwest::Request>,
312    ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
313        self.task_pool
314            .submit_batch(requests, self.config.max_attempts, self.config.task_timeout)
315    }
316
317    /// Build a [`PoolMetrics`] snapshot from all internal components.
318    pub fn metrics(&self) -> PoolMetrics {
319        let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
320
321        let total_s = self.health.total_success();
322        let total_f = self.health.total_fail();
323        let total_requests = total_s + total_f;
324        let success_rate = if total_requests > 0 {
325            total_s as f64 / total_requests as f64
326        } else {
327            0.0
328        };
329
330        PoolMetrics {
331            total_proxies: total,
332            healthy_proxies: healthy,
333            cooldown_proxies: cooldown,
334            dead_proxies: dead,
335
336            pending_tasks: self.task_pool.pending_count(),
337            completed_tasks: self.task_pool.completed_count(),
338            failed_tasks: self.task_pool.failed_count(),
339
340            throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
341            throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
342            throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
343
344            success_rate_1m: success_rate,
345            avg_latency_ms: self.health.avg_latency_ms(),
346
347            inflight: self.config.max_inflight - self.semaphore.available_permits(),
348            circuit_breakers: self.circuit_breakers.get_all(),
349        }
350    }
351
352    /// Gracefully shut down the proxy: signal all background tasks, persist
353    /// final state, and drop resources.
354    pub async fn shutdown(self) {
355        info!("initiating scatter-proxy shutdown");
356
357        // Signal all background tasks to stop.
358        let _ = self.shutdown_tx.send(());
359
360        // Final state save if configured.
361        if let Some(ref path) = self.config.state_file {
362            let health_stats = self.health.get_all_stats();
363            let proxy_states = build_proxy_states(&self.proxy_manager);
364            if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
365                warn!(error = %e, "failed to save final state during shutdown");
366            } else {
367                debug!("saved final state to disk");
368            }
369        }
370
371        info!("scatter-proxy shutdown complete");
372        // JoinHandles are dropped here, which detaches the background tasks
373        // (they will exit via the shutdown signal).
374    }
375}
376
377/// Build a map of proxy URL → state string for persistence.
378fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
379    let urls = proxy_manager.all_proxy_urls();
380    let mut states = HashMap::with_capacity(urls.len());
381    for url in urls {
382        let state = proxy_manager.get_state(&url);
383        let label = match state {
384            proxy::ProxyState::Active => "Active",
385            proxy::ProxyState::Dead => "Dead",
386            proxy::ProxyState::Unknown => "Unknown",
387        };
388        states.insert(url, label.to_string());
389    }
390    states
391}