Skip to main content

proxychains_masq/
chain.rs

1use std::{
2    collections::HashMap,
3    io::Write,
4    net::{IpAddr, Ipv4Addr, Ipv6Addr},
5    sync::{
6        atomic::{AtomicUsize, Ordering},
7        RwLock,
8    },
9    time::Duration,
10};
11
12use anyhow::{bail, Context, Result};
13use rand::{rngs::OsRng, seq::SliceRandom};
14use tokio::{net::TcpStream, time::timeout};
15
16use crate::proxy::{http, https, raw, socks4, socks5, BoxStream, Target};
17
18// ─── Public types re-exported from config ─────────────────────────────────────
19
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum ChainType {
22    Strict,
23    Dynamic,
24    Random,
25    RoundRobin,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum ProxyType {
30    Socks4,
31    Socks5,
32    Http,
33    /// HTTP CONNECT over TLS.  Whether to verify the proxy certificate is
34    /// controlled by [`ChainConfig::tls_skip_verify`].
35    Https,
36    Raw,
37}
38
39/// A single proxy in the chain.
40#[derive(Debug, Clone)]
41pub struct ProxyEntry {
42    pub proxy_type: ProxyType,
43    pub addr: IpAddr,
44    pub port: u16,
45    pub username: Option<String>,
46    pub password: Option<String>,
47}
48
49/// A localnet exclusion rule.
50#[derive(Debug, Clone)]
51pub struct LocalNet {
52    pub addr: IpAddr,
53    pub mask_v4: Option<Ipv4Addr>,
54    pub prefix_v6: Option<u8>,
55    pub port: Option<u16>,
56}
57
58/// A DNAT rewrite rule.
59#[derive(Debug, Clone)]
60pub struct DnatRule {
61    pub orig_addr: Ipv4Addr,
62    pub orig_port: Option<u16>,
63    pub new_addr: Ipv4Addr,
64    pub new_port: Option<u16>,
65}
66
67/// Configuration for [`ChainEngine`].
68#[derive(Debug, Clone)]
69pub struct ChainConfig {
70    pub proxies: Vec<ProxyEntry>,
71    pub chain_type: ChainType,
72    /// Number of proxies to use per connection (for Random / RoundRobin).
73    pub chain_len: usize,
74    /// How many times to retry with a new chain selection after total failure.
75    ///
76    /// Only applies to `Random` and `RoundRobin` chains, which select a subset
77    /// of proxies each attempt.  `Strict` and `Dynamic` already try every proxy
78    /// in the list, so retrying would repeat the same sequence.
79    pub chain_retries: usize,
80    pub connect_timeout: Duration,
81    pub localnets: Vec<LocalNet>,
82    pub dnats: Vec<DnatRule>,
83    /// When `true`, TLS certificate validation is skipped for all HTTPS proxies.
84    ///
85    /// Controlled by the `--tls-skip-verify` CLI flag.  Defaults to `false`.
86    pub tls_skip_verify: bool,
87    /// Number of failures before a proxy is excluded from chain selection.
88    ///
89    /// A proxy that fails (TCP connect or protocol handshake) this many times
90    /// across all connections is marked dead and skipped.  Set to `0` to
91    /// disable the dead-proxy filter entirely.  Default: 3.
92    pub proxy_dead_threshold: usize,
93}
94
95impl Default for ChainConfig {
96    fn default() -> Self {
97        ChainConfig {
98            proxies: Vec::new(),
99            chain_type: ChainType::Dynamic,
100            chain_len: 1,
101            chain_retries: 3,
102            connect_timeout: Duration::from_secs(10),
103            localnets: Vec::new(),
104            dnats: Vec::new(),
105            tls_skip_verify: false,
106            proxy_dead_threshold: 3,
107        }
108    }
109}
110
111// ─── ChainEngine ─────────────────────────────────────────────────────────────
112
113/// Routes outbound TCP connections through a configurable chain of proxies.
114pub struct ChainEngine {
115    config: ChainConfig,
116    /// Shared offset for round-robin mode.
117    rr_offset: AtomicUsize,
118    /// Cumulative failure counts keyed by `(addr, port)`.  Once a proxy
119    /// reaches `config.proxy_dead_threshold`, it is excluded from selection.
120    failure_counts: RwLock<HashMap<(IpAddr, u16), usize>>,
121}
122
123impl ChainEngine {
124    /// Create a new engine from `config`.
125    pub fn new(config: ChainConfig) -> Self {
126        ChainEngine {
127            config,
128            rr_offset: AtomicUsize::new(0),
129            failure_counts: RwLock::new(HashMap::new()),
130        }
131    }
132
133    /// Return the current round-robin offset without advancing it.
134    ///
135    /// Useful for inspection and benchmarking; the value may change
136    /// concurrently and is only a snapshot.
137    pub fn rr_peek_offset(&self) -> usize {
138        self.rr_offset.load(Ordering::Relaxed)
139    }
140
141    /// Open a connection to `target` through the proxy chain.
142    ///
143    /// If the target matches a `localnet` rule, a direct connection is made.
144    /// DNAT rewrites are applied before localnet and proxy selection.
145    ///
146    /// For `Random` and `RoundRobin` chains, the selection is retried up to
147    /// `chain_retries` additional times (with a fresh random/rotating slice
148    /// each attempt) before giving up.
149    ///
150    /// Returns a [`BoxStream`] so that TLS-wrapped hops (HTTPS proxies) and
151    /// plain TCP hops share the same return type.
152    pub async fn connect(&self, target: Target) -> Result<BoxStream> {
153        let target = self.apply_dnat(target);
154
155        if self.is_localnet(&target) {
156            return self.direct_connect(&target).await;
157        }
158
159        // Strict / Dynamic iterate all proxies internally, so a second attempt
160        // would repeat the same sequence — no benefit.
161        let max_attempts = match self.config.chain_type {
162            ChainType::Strict | ChainType::Dynamic => 1,
163            ChainType::Random | ChainType::RoundRobin => 1 + self.config.chain_retries,
164        };
165
166        let mut last_err = anyhow::anyhow!("no proxies configured");
167        for attempt in 0..max_attempts {
168            let result = match self.config.chain_type {
169                ChainType::Strict => self.connect_strict(target.clone()).await,
170                ChainType::Dynamic => self.connect_dynamic(target.clone()).await,
171                ChainType::Random => self.connect_random(target.clone()).await,
172                ChainType::RoundRobin => self.connect_round_robin(target.clone()).await,
173            };
174            match result {
175                Ok(s) => return Ok(s),
176                Err(e) => {
177                    if attempt + 1 < max_attempts {
178                        tracing::debug!(
179                            "chain attempt {} failed ({e:#}), retrying with new selection",
180                            attempt + 1
181                        );
182                    }
183                    last_err = e;
184                }
185            }
186        }
187        Err(last_err)
188    }
189
190    // ── Chain modes ───────────────────────────────────────────────────────────
191
192    async fn connect_strict(&self, target: Target) -> Result<BoxStream> {
193        // All proxies must be online — bypass the dead-proxy pre-filter so that
194        // a historically-marked proxy is still attempted (and causes failure if
195        // it truly is down).
196        let refs: Vec<&ProxyEntry> = self.config.proxies.iter().collect();
197        if refs.is_empty() {
198            bail!("strict_chain: no proxies configured");
199        }
200        // Single fixed attempt through every proxy in order — no fallback.
201        self.try_proxy_chain(&refs, target, "strict")
202            .await
203            .context("strict_chain")
204    }
205
206    async fn connect_dynamic(&self, target: Target) -> Result<BoxStream> {
207        let refs = self.live_proxy_refs();
208        if refs.is_empty() {
209            bail!("dynamic_chain: no proxies configured");
210        }
211        // All live proxies are chained in order.  When the first-hop TCP connect
212        // fails the anchor advances by one, skipping that dead proxy while keeping
213        // every remaining proxy in the chain.  At least one proxy must be reachable.
214        let mut last_err = anyhow::anyhow!("all proxies unreachable");
215        for anchor in 0..refs.len() {
216            let window = &refs[anchor..];
217            match self.try_proxy_chain(window, target.clone(), "dynamic").await {
218                Ok(s) => return Ok(s),
219                Err(e) => last_err = e,
220            }
221        }
222        Err(last_err).context("dynamic_chain")
223    }
224
225    async fn connect_random(&self, target: Target) -> Result<BoxStream> {
226        let chain_len = self.config.chain_len.max(1);
227        let pool = self.live_proxy_refs();
228        if pool.len() < chain_len {
229            bail!(
230                "random_chain: need {chain_len} proxies, only {} available (live)",
231                pool.len()
232            );
233        }
234        // OsRng reads directly from the OS entropy source (getrandom) on every
235        // call, guaranteeing different selections across runs and across
236        // connections within a run.  It is also Send, so the future remains
237        // Send without any drop-before-await gymnastics.
238        let mut selected = pool;
239        selected.shuffle(&mut OsRng);
240        selected.truncate(chain_len);
241        self.try_proxy_chain(&selected, target, "random")
242            .await
243            .context("random_chain")
244    }
245
246    async fn connect_round_robin(&self, target: Target) -> Result<BoxStream> {
247        let chain_len = self.config.chain_len.max(1);
248        let pool = self.live_proxy_refs();
249        if pool.is_empty() {
250            bail!("round_robin_chain: no proxies");
251        }
252        let n = pool.len();
253        let offset = self.rr_offset.fetch_add(chain_len, Ordering::SeqCst) % n;
254        let selected: Vec<&ProxyEntry> = (0..chain_len).map(|i| pool[(offset + i) % n]).collect();
255        self.try_proxy_chain(&selected, target, "round-robin")
256            .await
257            .context("round_robin_chain")
258    }
259
260    // ── Helpers ───────────────────────────────────────────────────────────────
261
262    /// Apply DNAT rewrite rules to `target`.
263    ///
264    /// Returns the rewritten target, or `target` unchanged if no rule matches.
265    pub fn apply_dnat(&self, target: Target) -> Target {
266        if let Target::Ip(IpAddr::V4(ip), port) = &target {
267            for rule in &self.config.dnats {
268                if rule.orig_addr == *ip {
269                    if let Some(orig_port) = rule.orig_port {
270                        if orig_port != *port {
271                            continue;
272                        }
273                    }
274                    let new_port = rule.new_port.unwrap_or(*port);
275                    return Target::Ip(IpAddr::V4(rule.new_addr), new_port);
276                }
277            }
278        }
279        target
280    }
281
282    /// Check whether `target` matches any localnet exclusion rule.
283    ///
284    /// Returns `true` when the connection should bypass the proxy chain and
285    /// connect directly.
286    pub fn is_localnet(&self, target: &Target) -> bool {
287        let (ip, port) = match target {
288            Target::Ip(ip, p) => (Some(*ip), *p),
289            Target::Host(_, p) => (None, *p),
290        };
291        let Some(ip) = ip else { return false };
292
293        for ln in &self.config.localnets {
294            if let Some(p) = ln.port {
295                if p != port {
296                    continue;
297                }
298            }
299            match (ip, ln.addr) {
300                (IpAddr::V4(tip), IpAddr::V4(laddr)) => {
301                    if let Some(mask) = ln.mask_v4 {
302                        let t = u32::from(tip);
303                        let l = u32::from(laddr);
304                        let m = u32::from(mask);
305                        if (t & m) == (l & m) {
306                            return true;
307                        }
308                    }
309                }
310                (IpAddr::V6(tip), IpAddr::V6(laddr)) => {
311                    if let Some(prefix) = ln.prefix_v6 {
312                        if ipv6_match(tip, laddr, prefix) {
313                            return true;
314                        }
315                    }
316                }
317                _ => {}
318            }
319        }
320        false
321    }
322
323    /// Return references to all proxies that have not been marked dead.
324    ///
325    /// Falls back to the full proxy list if every proxy is dead, so that
326    /// callers always have at least one candidate to try.
327    pub fn live_proxy_refs(&self) -> Vec<&ProxyEntry> {
328        let threshold = self.config.proxy_dead_threshold;
329        if threshold == 0 {
330            return self.config.proxies.iter().collect();
331        }
332        let counts = self
333            .failure_counts
334            .read()
335            .expect("failure_counts RwLock poisoned");
336        let live: Vec<&ProxyEntry> = self
337            .config
338            .proxies
339            .iter()
340            .filter(|p| counts.get(&(p.addr, p.port)).copied().unwrap_or(0) < threshold)
341            .collect();
342        if live.is_empty() {
343            // All proxies exceeded the threshold; reset by returning the full list
344            // so the engine can keep working rather than silently failing forever.
345            tracing::debug!("all proxies marked dead — using full list as fallback");
346            self.config.proxies.iter().collect()
347        } else {
348            live
349        }
350    }
351
352    /// Increment the failure counter for `proxy`.  No-op when `proxy_dead_threshold == 0`.
353    ///
354    /// Callers may use this to pre-populate the dead-proxy state (e.g. from
355    /// persistent storage between process restarts) or to manually evict a
356    /// known-bad proxy.
357    pub fn record_failure(&self, proxy: &ProxyEntry) {
358        if self.config.proxy_dead_threshold == 0 {
359            return;
360        }
361        let mut counts = self
362            .failure_counts
363            .write()
364            .expect("failure_counts RwLock poisoned");
365        *counts.entry((proxy.addr, proxy.port)).or_insert(0) += 1;
366        let new_count = counts[&(proxy.addr, proxy.port)];
367        if new_count >= self.config.proxy_dead_threshold {
368            tracing::debug!(
369                "proxy {}:{} marked dead after {new_count} failures",
370                proxy.addr,
371                proxy.port
372            );
373        }
374    }
375
376    /// Attempt a single proxy chain through a pre-selected `window` of proxies.
377    ///
378    /// Prints each hop to stderr immediately before attempting it, so the user
379    /// sees incremental progress in real time.  On failure the error marker is
380    /// appended to the same line before the newline is flushed.
381    async fn try_proxy_chain(
382        &self,
383        window: &[&ProxyEntry],
384        target: Target,
385        label: &str,
386    ) -> Result<BoxStream> {
387        debug_assert!(!window.is_empty());
388
389        // Print label and first hop before the TCP connect attempt.
390        eprint!(
391            "[proxychains-tun] {label} - {}:{}",
392            window[0].addr, window[0].port
393        );
394        let _ = std::io::stderr().flush();
395
396        let stream = match self.tcp_connect(window[0].addr, window[0].port).await {
397            Ok(s) => s,
398            Err(e) => {
399                eprintln!(" <--socket error or timeout!");
400                tracing::debug!(
401                    "{label} tcp connect to {}:{} failed: {e:#}",
402                    window[0].addr, window[0].port
403                );
404                self.record_failure(window[0]);
405                return Err(e);
406            }
407        };
408
409        match chain_from(stream, window, target, self.config.tls_skip_verify).await {
410            Ok(s) => Ok(s),
411            Err(e) => {
412                tracing::debug!("{label} handshake error: {e:#}");
413                self.record_failure(window[0]);
414                Err(e)
415            }
416        }
417    }
418
419    async fn tcp_connect(&self, addr: IpAddr, port: u16) -> Result<BoxStream> {
420        let stream = timeout(
421            self.config.connect_timeout,
422            TcpStream::connect((addr, port)),
423        )
424        .await
425        .context("tcp connect timed out")?
426        .context("tcp connect failed")?;
427        Ok(Box::new(stream))
428    }
429
430    async fn direct_connect(&self, target: &Target) -> Result<BoxStream> {
431        let stream = match target {
432            Target::Ip(ip, port) => timeout(
433                self.config.connect_timeout,
434                TcpStream::connect((*ip, *port)),
435            )
436            .await
437            .context("direct connect timed out")?
438            .context("direct connect failed")?,
439            Target::Host(h, p) => timeout(
440                self.config.connect_timeout,
441                TcpStream::connect(format!("{h}:{p}").as_str()),
442            )
443            .await
444            .context("direct connect timed out")?
445            .context("direct connect failed")?,
446        };
447        Ok(Box::new(stream))
448    }
449
450}
451
452fn ipv6_match(a: Ipv6Addr, b: Ipv6Addr, prefix: u8) -> bool {
453    let a = a.octets();
454    let b = b.octets();
455    let full = (prefix / 8) as usize;
456    let rem = prefix % 8;
457    if a[..full] != b[..full] {
458        return false;
459    }
460    if rem > 0 {
461        let mask = 0xFFu8 << (8 - rem);
462        if (a[full] & mask) != (b[full] & mask) {
463            return false;
464        }
465    }
466    true
467}
468
469/// Build a [`Target`] pointing to `proxy`'s address (used as intermediate hop).
470fn hop_target(proxy: &ProxyEntry) -> Target {
471    Target::Ip(proxy.addr, proxy.port)
472}
473
474/// Perform the appropriate protocol handshake for `prev_proxy` to connect to `next_target`.
475async fn handshake(
476    stream: BoxStream,
477    prev_proxy: &ProxyEntry,
478    next_target: Target,
479    tls_skip_verify: bool,
480) -> Result<BoxStream> {
481    let user = prev_proxy.username.as_deref();
482    let pass = prev_proxy.password.as_deref();
483    match prev_proxy.proxy_type {
484        ProxyType::Socks4 => socks4::connect(stream, &next_target, user).await,
485        ProxyType::Socks5 => socks5::connect(stream, &next_target, user, pass).await,
486        ProxyType::Http => http::connect(stream, &next_target, user, pass).await,
487        ProxyType::Https => {
488            https::connect(stream, &next_target, user, pass, prev_proxy.addr, tls_skip_verify)
489                .await
490        }
491        ProxyType::Raw => raw::connect(stream, &next_target).await,
492    }
493}
494
495/// Drive a pre-selected proxy window to `target`.
496///
497/// `window[0]` is already TCP-connected and its address was printed by the
498/// caller.  This function prints each subsequent hop to stderr *before*
499/// attempting the handshake, then appends `OK` or `<--socket error or
500/// timeout!` and a newline once the outcome is known.
501async fn chain_from(
502    mut stream: BoxStream,
503    window: &[&ProxyEntry],
504    target: Target,
505    tls_skip_verify: bool,
506) -> Result<BoxStream> {
507    for i in 0..window.len() - 1 {
508        // Print the next hop before attempting the handshake to reach it.
509        eprint!(" - {}:{}", window[i + 1].addr, window[i + 1].port);
510        let _ = std::io::stderr().flush();
511        match handshake(stream, window[i], hop_target(window[i + 1]), tls_skip_verify).await {
512            Ok(s) => stream = s,
513            Err(e) => {
514                eprintln!(" <--socket error or timeout!");
515                return Err(e);
516            }
517        }
518    }
519
520    // Final handshake: the last proxy connects us to the actual target.
521    eprint!(" - {target}");
522    let _ = std::io::stderr().flush();
523    match handshake(stream, window[window.len() - 1], target, tls_skip_verify).await {
524        Ok(s) => {
525            eprintln!(" OK");
526            Ok(s)
527        }
528        Err(e) => {
529            eprintln!(" <--socket error or timeout!");
530            Err(e)
531        }
532    }
533}