Skip to main content

scatter_proxy/
lib.rs

1//! scatter-proxy — async request scheduler for unreliable SOCKS5 proxies.
2//!
3//! Multi-path race for maximum throughput.
4
5mod circuit_breaker;
6mod classifier;
7mod config;
8mod error;
9mod health;
10mod metrics;
11mod persist;
12mod proxy;
13mod rate_limit;
14mod scheduler;
15mod score;
16mod task;
17
18// Re-exports (public API surface)
19pub use classifier::{BodyClassifier, BodyVerdict, DefaultClassifier};
20pub use config::{RateLimitConfig, ScatterProxyConfig};
21pub use error::ScatterProxyError;
22pub use metrics::{PoolMetrics, ProxyHostStats};
23pub use task::{ScatterResponse, TaskHandle};
24
25// Also re-export useful types for the classifier trait
26pub use http::HeaderMap;
27pub use http::StatusCode;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::sync::{broadcast, Semaphore};
33use tokio::task::JoinHandle;
34use tracing::{debug, info, warn};
35
36/// The main entry point. Manages proxy pool, scheduling, and background tasks.
37#[allow(dead_code)]
38pub struct ScatterProxy {
39    config: Arc<ScatterProxyConfig>,
40    task_pool: Arc<task::TaskPool>,
41    health: Arc<health::HealthTracker>,
42    rate_limiter: Arc<rate_limit::RateLimiter>,
43    circuit_breakers: Arc<circuit_breaker::CircuitBreakerManager>,
44    proxy_manager: Arc<proxy::ProxyManager>,
45    throughput: Arc<metrics::ThroughputTracker>,
46    semaphore: Arc<Semaphore>,
47    shutdown_tx: broadcast::Sender<()>,
48    // Background task handles
49    _scheduler_handle: JoinHandle<()>,
50    _persist_handle: Option<JoinHandle<()>>,
51    _metrics_handle: JoinHandle<()>,
52    _refresh_handle: JoinHandle<()>,
53}
54
55impl ScatterProxy {
56    /// Create a new `ScatterProxy` from config and a body classifier.
57    ///
58    /// This is an async constructor that fetches proxies, restores persisted
59    /// state, and spawns all background tasks (scheduler, persistence, metrics
60    /// logging, source refresh).
61    pub async fn new(
62        config: ScatterProxyConfig,
63        classifier: impl BodyClassifier,
64    ) -> Result<Self, ScatterProxyError> {
65        let config = Arc::new(config);
66
67        // Build all Arc-wrapped components from config.
68        let task_pool = Arc::new(task::TaskPool::new(config.task_pool_capacity));
69        let health = Arc::new(health::HealthTracker::new(config.health_window));
70        let rate_limiter = Arc::new(rate_limit::RateLimiter::new(&config.rate_limit));
71        let circuit_breakers = Arc::new(circuit_breaker::CircuitBreakerManager::new(
72            config.circuit_breaker_threshold,
73            config.circuit_breaker_probe_interval,
74        ));
75        let proxy_manager = Arc::new(proxy::ProxyManager::new(config.proxy_timeout));
76        let throughput = Arc::new(metrics::ThroughputTracker::new());
77        let semaphore = Arc::new(Semaphore::new(config.max_inflight));
78        let classifier: Arc<dyn BodyClassifier> = Arc::new(classifier);
79
80        // Fetch proxies from configured sources.
81        let proxy_count = proxy_manager
82            .fetch_and_add(&config.sources, config.prefer_remote_dns)
83            .await?;
84
85        // Restore persisted state if configured.
86        if let Some(ref state_path) = config.state_file {
87            match persist::load_state(state_path).await {
88                Ok(Some(persisted)) => {
89                    for (proxy_url, persisted_proxy) in &persisted.proxies {
90                        // Restore per-host health data.
91                        for (host, stats) in &persisted_proxy.hosts {
92                            health.restore(proxy_url, host, stats);
93                        }
94                        // Restore proxy state.
95                        match persisted_proxy.state.as_str() {
96                            "Active" => {
97                                proxy_manager.set_state(proxy_url, proxy::ProxyState::Active)
98                            }
99                            "Dead" => proxy_manager.set_state(proxy_url, proxy::ProxyState::Dead),
100                            _ => {} // Unknown or unrecognised — leave as default
101                        }
102                    }
103                    debug!(
104                        proxies = persisted.proxies.len(),
105                        "restored persisted state"
106                    );
107                }
108                Ok(None) => {
109                    debug!("no persisted state file found, starting fresh");
110                }
111                Err(e) => {
112                    warn!(error = %e, "failed to load persisted state, starting fresh");
113                }
114            }
115        }
116
117        info!(count = proxy_count, "loaded proxies from sources");
118
119        // Shutdown broadcast channel.
120        let (shutdown_tx, _) = broadcast::channel(1);
121
122        // ── Spawn background tasks ──────────────────────────────────────
123
124        // (a) Scheduler
125        let sched = scheduler::Scheduler::new(
126            Arc::clone(&config),
127            Arc::clone(&task_pool),
128            Arc::clone(&health),
129            Arc::clone(&rate_limiter),
130            Arc::clone(&circuit_breakers),
131            Arc::clone(&proxy_manager),
132            Arc::clone(&classifier),
133            Arc::clone(&semaphore),
134            Arc::clone(&throughput),
135        );
136        let shutdown_rx_sched = shutdown_tx.subscribe();
137        let _scheduler_handle = tokio::spawn(async move {
138            sched.run(shutdown_rx_sched).await;
139        });
140
141        // (b) State persistence (only if state_file is configured)
142        let _persist_handle = if config.state_file.is_some() {
143            let persist_config = Arc::clone(&config);
144            let persist_health = Arc::clone(&health);
145            let persist_proxy_mgr = Arc::clone(&proxy_manager);
146            let mut shutdown_rx_persist = shutdown_tx.subscribe();
147            let interval = config.state_save_interval;
148
149            Some(tokio::spawn(async move {
150                loop {
151                    tokio::select! {
152                        _ = shutdown_rx_persist.recv() => {
153                            debug!("persist task received shutdown signal");
154                            break;
155                        }
156                        _ = tokio::time::sleep(interval) => {
157                            if let Some(ref path) = persist_config.state_file {
158                                let health_stats = persist_health.get_all_stats();
159                                let proxy_states = build_proxy_states(&persist_proxy_mgr);
160                                if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
161                                    warn!(error = %e, "failed to save state");
162                                } else {
163                                    debug!("persisted state to disk");
164                                }
165                            }
166                        }
167                    }
168                }
169            }))
170        } else {
171            None
172        };
173
174        // (c) Metrics logging
175        let _metrics_config = Arc::clone(&config);
176        let metrics_task_pool = Arc::clone(&task_pool);
177        let metrics_health = Arc::clone(&health);
178        let metrics_proxy_mgr = Arc::clone(&proxy_manager);
179        let metrics_throughput = Arc::clone(&throughput);
180        let metrics_semaphore = Arc::clone(&semaphore);
181        let metrics_cb = Arc::clone(&circuit_breakers);
182        let mut shutdown_rx_metrics = shutdown_tx.subscribe();
183        let metrics_interval = config.metrics_log_interval;
184        let max_inflight = config.max_inflight;
185
186        let _metrics_handle = tokio::spawn(async move {
187            loop {
188                tokio::select! {
189                    _ = shutdown_rx_metrics.recv() => {
190                        debug!("metrics task received shutdown signal");
191                        break;
192                    }
193                    _ = tokio::time::sleep(metrics_interval) => {
194                        let tp = metrics_throughput.throughput(Duration::from_secs(10));
195                        let total_s = metrics_health.total_success();
196                        let total_f = metrics_health.total_fail();
197                        let total = total_s + total_f;
198                        let success_pct = if total > 0 {
199                            (total_s as f64 / total as f64) * 100.0
200                        } else {
201                            0.0
202                        };
203                        let (_, healthy, cooldown, dead) = metrics_proxy_mgr.proxy_counts();
204                        let pending = metrics_task_pool.pending_count();
205                        let done = metrics_task_pool.completed_count();
206                        let failed = metrics_task_pool.failed_count();
207                        let inflight = max_inflight - metrics_semaphore.available_permits();
208                        let breakers = metrics_cb.get_all();
209                        let open_breakers: Vec<&String> = breakers.iter()
210                            .filter(|(_, &open)| open)
211                            .map(|(h, _)| h)
212                            .collect();
213
214                        info!(
215                            "throughput={:.1}/s | success={:.0}% | pool: {} healthy / {} cooldown / {} dead | tasks: {} pending {} done {} failed | inflight={} | breakers: {}",
216                            tp,
217                            success_pct,
218                            healthy,
219                            cooldown,
220                            dead,
221                            pending,
222                            done,
223                            failed,
224                            inflight,
225                            if open_breakers.is_empty() {
226                                "none".to_string()
227                            } else {
228                                format!("{:?}", open_breakers)
229                            }
230                        );
231                    }
232                }
233            }
234        });
235
236        // (d) Source refresh
237        let refresh_config = Arc::clone(&config);
238        let refresh_proxy_mgr = Arc::clone(&proxy_manager);
239        let mut shutdown_rx_refresh = shutdown_tx.subscribe();
240        let refresh_interval = config.source_refresh_interval;
241
242        let _refresh_handle = tokio::spawn(async move {
243            loop {
244                tokio::select! {
245                    _ = shutdown_rx_refresh.recv() => {
246                        debug!("refresh task received shutdown signal");
247                        break;
248                    }
249                    _ = tokio::time::sleep(refresh_interval) => {
250                        match refresh_proxy_mgr
251                            .fetch_and_add(&refresh_config.sources, refresh_config.prefer_remote_dns)
252                            .await
253                        {
254                            Ok(count) => {
255                                debug!(new_count = count, "refreshed proxy sources");
256                            }
257                            Err(e) => {
258                                warn!(error = %e, "failed to refresh proxy sources");
259                            }
260                        }
261                    }
262                }
263            }
264        });
265
266        Ok(ScatterProxy {
267            config,
268            task_pool,
269            health,
270            rate_limiter,
271            circuit_breakers,
272            proxy_manager,
273            throughput,
274            semaphore,
275            shutdown_tx,
276            _scheduler_handle,
277            _persist_handle,
278            _metrics_handle,
279            _refresh_handle,
280        })
281    }
282
283    /// Submit a single request for proxied execution.
284    ///
285    /// Returns a [`TaskHandle`] that implements `Future` and resolves to the
286    /// proxied response (or an error).
287    pub fn submit(&self, request: reqwest::Request) -> Result<TaskHandle, ScatterProxyError> {
288        self.task_pool
289            .submit(request, self.config.max_attempts, self.config.task_timeout)
290    }
291
292    /// Submit a batch of requests for proxied execution.
293    ///
294    /// All requests are added atomically — if the pool doesn't have enough
295    /// capacity the entire batch is rejected.
296    pub fn submit_batch(
297        &self,
298        requests: Vec<reqwest::Request>,
299    ) -> Result<Vec<TaskHandle>, ScatterProxyError> {
300        self.task_pool
301            .submit_batch(requests, self.config.max_attempts, self.config.task_timeout)
302    }
303
304    /// Build a [`PoolMetrics`] snapshot from all internal components.
305    pub fn metrics(&self) -> PoolMetrics {
306        let (total, healthy, cooldown, dead) = self.proxy_manager.proxy_counts();
307
308        let total_s = self.health.total_success();
309        let total_f = self.health.total_fail();
310        let total_requests = total_s + total_f;
311        let success_rate = if total_requests > 0 {
312            total_s as f64 / total_requests as f64
313        } else {
314            0.0
315        };
316
317        PoolMetrics {
318            total_proxies: total,
319            healthy_proxies: healthy,
320            cooldown_proxies: cooldown,
321            dead_proxies: dead,
322
323            pending_tasks: self.task_pool.pending_count(),
324            completed_tasks: self.task_pool.completed_count(),
325            failed_tasks: self.task_pool.failed_count(),
326
327            throughput_1s: self.throughput.throughput(Duration::from_secs(1)),
328            throughput_10s: self.throughput.throughput(Duration::from_secs(10)),
329            throughput_60s: self.throughput.throughput(Duration::from_secs(60)),
330
331            success_rate_1m: success_rate,
332            avg_latency_ms: self.health.avg_latency_ms(),
333
334            inflight: self.config.max_inflight - self.semaphore.available_permits(),
335            circuit_breakers: self.circuit_breakers.get_all(),
336        }
337    }
338
339    /// Gracefully shut down the proxy: signal all background tasks, persist
340    /// final state, and drop resources.
341    pub async fn shutdown(self) {
342        info!("initiating scatter-proxy shutdown");
343
344        // Signal all background tasks to stop.
345        let _ = self.shutdown_tx.send(());
346
347        // Final state save if configured.
348        if let Some(ref path) = self.config.state_file {
349            let health_stats = self.health.get_all_stats();
350            let proxy_states = build_proxy_states(&self.proxy_manager);
351            if let Err(e) = persist::save_state(path, &health_stats, &proxy_states).await {
352                warn!(error = %e, "failed to save final state during shutdown");
353            } else {
354                debug!("saved final state to disk");
355            }
356        }
357
358        info!("scatter-proxy shutdown complete");
359        // JoinHandles are dropped here, which detaches the background tasks
360        // (they will exit via the shutdown signal).
361    }
362}
363
364/// Build a map of proxy URL → state string for persistence.
365fn build_proxy_states(proxy_manager: &proxy::ProxyManager) -> HashMap<String, String> {
366    let urls = proxy_manager.all_proxy_urls();
367    let mut states = HashMap::with_capacity(urls.len());
368    for url in urls {
369        let state = proxy_manager.get_state(&url);
370        let label = match state {
371            proxy::ProxyState::Active => "Active",
372            proxy::ProxyState::Dead => "Dead",
373            proxy::ProxyState::Unknown => "Unknown",
374        };
375        states.insert(url, label.to_string());
376    }
377    states
378}