phala_tokio_proxy/
lib.rs

1use bytes::{BufMut, BytesMut};
2use log::*;
3use std::io;
4use std::net::{IpAddr, SocketAddr};
5use std::str::FromStr;
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::lookup_host;
8use tokio::net::TcpStream;
9use tokio::net::ToSocketAddrs;
10use url::Url;
11
12pub trait ToSocketAddrsExt: ToSocketAddrs {
13    fn to_string2(&self) -> String;
14}
15
16impl ToSocketAddrsExt for SocketAddr {
17    fn to_string2(&self) -> String {
18        self.to_string()
19    }
20}
21
22impl ToSocketAddrsExt for str {
23    fn to_string2(&self) -> String {
24        self.to_string()
25    }
26}
27
28impl ToSocketAddrsExt for &str {
29    fn to_string2(&self) -> String {
30        self.to_string()
31    }
32}
33
34impl ToSocketAddrsExt for String {
35    fn to_string2(&self) -> String {
36        self.to_string()
37    }
38}
39
40impl ToSocketAddrsExt for (&'_ str, u16) {
41    fn to_string2(&self) -> String {
42        format!("{}:{}", self.0, self.1)
43    }
44}
45
46impl ToSocketAddrsExt for (IpAddr, u16) {
47    fn to_string2(&self) -> String {
48        format!("{}:{}", self.0, self.1)
49    }
50}
51
52const ADDR_TYPE_IPV4ADDR: u8 = 0x01;
53const ADDR_TYPE_DOMAINNAME: u8 = 0x03;
54const ADDR_TYPE_IPV6ADDR: u8 = 0x04;
55
56fn put_ip(ip: &IpAddr, buf: &mut BytesMut) {
57    match ip {
58        IpAddr::V4(ip) => {
59            buf.put_u8(ADDR_TYPE_IPV4ADDR);
60            buf.put(&ip.octets()[..]);
61        }
62        IpAddr::V6(ip) => {
63            buf.put_u8(ADDR_TYPE_IPV6ADDR);
64            buf.put(&ip.octets()[..]);
65        }
66    };
67}
68
69pub async fn connect<A: ToSocketAddrsExt, T: AsRef<str>>(
70    addr: A,
71    proxy_url: T,
72) -> io::Result<TcpStream> {
73    let addr2 = addr.to_string2();
74    let proxy_url = match Url::parse(proxy_url.as_ref()) {
75        Ok(url) => url,
76        Err(err) => {
77            return Err(io::Error::new(
78                io::ErrorKind::Other,
79                format!("invalid socks5 url: {:?}", err),
80            ));
81        }
82    };
83
84    let mut addrs = Vec::new();
85    let max_tries = match proxy_url.scheme() {
86        "socks5" => {
87            addrs = lookup_host(addr).await?.collect();
88            addrs.len()
89        }
90        "socks5h" => 1,
91        _ => {
92            return Err(io::Error::new(
93                io::ErrorKind::Other,
94                "invalid socks5 url: unsupported scheme",
95            ));
96        }
97    };
98
99    let mut buf = BytesMut::with_capacity(512);
100    let proxy_addr = (
101        proxy_url.host_str().ok_or(io::Error::new(
102            io::ErrorKind::Other,
103            "invalid socks5 url, no host",
104        ))?,
105        proxy_url.port().ok_or(io::Error::new(
106            io::ErrorKind::Other,
107            "invalid socks5 url, no port",
108        ))?,
109    );
110
111    for addr_idx in 0..max_tries {
112        trace!("connect socks5 server: {:?}", proxy_addr);
113        let mut s = TcpStream::connect(&proxy_addr).await?;
114
115        s.write_all(b"\x05\x01\x00").await?;
116        buf.resize(2, 0);
117        s.read_exact(&mut buf).await?;
118        if buf[1] != 0 {
119            return Err(io::Error::new(
120                io::ErrorKind::Other,
121                "unsupported authentication method",
122            ));
123        }
124
125        buf.clear();
126        buf.put(&b"\x05\x01\x00"[..]);
127        match proxy_url.scheme() {
128            "socks5" => {
129                let addr = addrs[addr_idx];
130                trace!("connect {}", addr);
131                let ip = addr.ip();
132                put_ip(&ip, &mut buf);
133                buf.put(addr.port().to_be_bytes().as_ref());
134            }
135            "socks5h" => {
136                let addr = addr2.clone();
137                trace!("connect {}", addr);
138                let idx = addr.rfind(':').unwrap();
139                let host = String::from(&addr[..idx]);
140                let port = addr[idx + 1..].parse::<u16>().unwrap();
141                match IpAddr::from_str(&host) {
142                    Ok(ip) => {
143                        put_ip(&ip, &mut buf);
144                    }
145                    Err(_) => {
146                        if host.len() > 255 {
147                            return Err(io::Error::new(io::ErrorKind::Other, "FQDN too long"));
148                        }
149                        buf.put_u8(ADDR_TYPE_DOMAINNAME);
150                        buf.put_u8(host.len() as u8);
151                        buf.put(host.as_bytes());
152                    }
153                }
154                buf.put(port.to_be_bytes().as_ref());
155            }
156            _ => {
157                return Err(io::Error::new(
158                    io::ErrorKind::Other,
159                    "invalid socks5 url: unsupported scheme",
160                ));
161            }
162        }
163
164        s.write_all(&buf[..]).await?;
165
166        buf.resize(4, 0);
167        let mut len = s.read_exact(&mut buf).await?;
168
169        let reply = buf[1];
170        if reply != 0 {
171            error!("socks5 connect command failed: reply={}", reply);
172            continue;
173        }
174
175        let addr_len = match buf[3] {
176            ADDR_TYPE_IPV4ADDR => 4,
177            ADDR_TYPE_IPV6ADDR => 16,
178            ADDR_TYPE_DOMAINNAME => {
179                buf.resize(5, 0);
180                len += s.read_exact(&mut buf[4..]).await?;
181                (buf[4] + 1) as usize
182            }
183            _ => {
184                return Err(io::Error::new(io::ErrorKind::Other, "unknown address type"));
185            }
186        };
187        let should_len = 4 + addr_len + 2;
188        buf.resize(should_len, 0);
189        s.read_exact(&mut buf[len..]).await?;
190
191        return Ok(s);
192    }
193
194    Err(io::Error::new(io::ErrorKind::Other, "failed to connect"))
195}