hyper_util/client/legacy/connect/proxy/socks/v5/
mod.rs1mod errors;
2pub use errors::*;
3
4mod messages;
5use messages::*;
6
7use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
8use std::task::{Context, Poll};
9
10use http::Uri;
11use hyper::rt::{Read, Write};
12use tower_service::Service;
13
14use bytes::BytesMut;
15
16use super::{Handshaking, SocksError};
17
18#[derive(Debug, Clone)]
24pub struct SocksV5<C> {
25 inner: C,
26 config: SocksConfig,
27}
28
29#[derive(Debug, Clone)]
30pub struct SocksConfig {
31 proxy: Uri,
32 proxy_auth: Option<(String, String)>,
33
34 local_dns: bool,
35 optimistic: bool,
36}
37
38#[derive(Debug)]
39enum State {
40 SendingNegReq,
41 ReadingNegRes,
42 SendingAuthReq,
43 ReadingAuthRes,
44 SendingProxyReq,
45 ReadingProxyRes,
46}
47
48impl<C> SocksV5<C> {
49 pub fn new(proxy_dst: Uri, connector: C) -> Self {
58 Self {
59 inner: connector,
60 config: SocksConfig::new(proxy_dst),
61 }
62 }
63
64 pub fn with_auth(mut self, user: String, pass: String) -> Self {
71 self.config.proxy_auth = Some((user, pass));
72 self
73 }
74
75 pub fn local_dns(mut self, local_dns: bool) -> Self {
80 self.config.local_dns = local_dns;
81 self
82 }
83
84 pub fn send_optimistically(mut self, optimistic: bool) -> Self {
94 self.config.optimistic = optimistic;
95 self
96 }
97}
98
99impl SocksConfig {
100 fn new(proxy: Uri) -> Self {
101 Self {
102 proxy,
103 proxy_auth: None,
104
105 local_dns: false,
106 optimistic: false,
107 }
108 }
109
110 async fn execute<T, E>(self, mut conn: T, host: String, port: u16) -> Result<T, SocksError<E>>
111 where
112 T: Read + Write + Unpin,
113 {
114 let address = match host.parse::<IpAddr>() {
115 Ok(ip) => Address::Socket(SocketAddr::new(ip, port)),
116 Err(_) if host.len() <= 255 => {
117 if self.local_dns {
118 let socket = (host, port)
119 .to_socket_addrs()?
120 .next()
121 .ok_or(SocksError::DnsFailure)?;
122
123 Address::Socket(socket)
124 } else {
125 Address::Domain(host, port)
126 }
127 }
128 Err(_) => return Err(SocksV5Error::HostTooLong.into()),
129 };
130
131 let method = if self.proxy_auth.is_some() {
132 AuthMethod::UserPass
133 } else {
134 AuthMethod::NoAuth
135 };
136
137 let mut recv_buf = BytesMut::with_capacity(513); let mut send_buf = BytesMut::with_capacity(262); let mut state = State::SendingNegReq;
140
141 loop {
142 match state {
143 State::SendingNegReq => {
144 let req = NegotiationReq(&method);
145
146 let start = send_buf.len();
147 req.write_to_buf(&mut send_buf)?;
148 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
149
150 if self.optimistic {
151 if method == AuthMethod::UserPass {
152 state = State::SendingAuthReq;
153 } else {
154 state = State::SendingProxyReq;
155 }
156 } else {
157 state = State::ReadingNegRes;
158 }
159 }
160
161 State::ReadingNegRes => {
162 let res: NegotiationRes = super::read_message(&mut conn, &mut recv_buf).await?;
163
164 if res.0 == AuthMethod::NoneAcceptable {
165 return Err(SocksV5Error::Auth(AuthError::Unsupported).into());
166 }
167
168 if res.0 != method {
169 return Err(SocksV5Error::Auth(AuthError::MethodMismatch).into());
170 }
171
172 if self.optimistic {
173 if res.0 == AuthMethod::UserPass {
174 state = State::ReadingAuthRes;
175 } else {
176 state = State::ReadingProxyRes;
177 }
178 } else if res.0 == AuthMethod::UserPass {
179 state = State::SendingAuthReq;
180 } else {
181 state = State::SendingProxyReq;
182 }
183 }
184
185 State::SendingAuthReq => {
186 let (user, pass) = self.proxy_auth.as_ref().unwrap();
187 let req = AuthenticationReq(user, pass);
188
189 let start = send_buf.len();
190 req.write_to_buf(&mut send_buf)?;
191 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
192
193 if self.optimistic {
194 state = State::SendingProxyReq;
195 } else {
196 state = State::ReadingAuthRes;
197 }
198 }
199
200 State::ReadingAuthRes => {
201 let res: AuthenticationRes =
202 super::read_message(&mut conn, &mut recv_buf).await?;
203
204 if !res.0 {
205 return Err(SocksV5Error::Auth(AuthError::Failed).into());
206 }
207
208 if self.optimistic {
209 state = State::ReadingProxyRes;
210 } else {
211 state = State::SendingProxyReq;
212 }
213 }
214
215 State::SendingProxyReq => {
216 let req = ProxyReq(&address);
217
218 let start = send_buf.len();
219 req.write_to_buf(&mut send_buf)?;
220 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
221
222 if self.optimistic {
223 state = State::ReadingNegRes;
224 } else {
225 state = State::ReadingProxyRes;
226 }
227 }
228
229 State::ReadingProxyRes => {
230 let res: ProxyRes = super::read_message(&mut conn, &mut recv_buf).await?;
231
232 if res.0 == Status::Success {
233 return Ok(conn);
234 } else {
235 return Err(SocksV5Error::Command(res.0).into());
236 }
237 }
238 }
239 }
240 }
241}
242
243impl<C> Service<Uri> for SocksV5<C>
244where
245 C: Service<Uri>,
246 C::Future: Send + 'static,
247 C::Response: Read + Write + Unpin + Send + 'static,
248 C::Error: Send + 'static,
249{
250 type Response = C::Response;
251 type Error = SocksError<C::Error>;
252 type Future = Handshaking<C::Future, C::Response, C::Error>;
253
254 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
255 self.inner.poll_ready(cx).map_err(SocksError::Inner)
256 }
257
258 fn call(&mut self, dst: Uri) -> Self::Future {
259 let config = self.config.clone();
260 let connecting = self.inner.call(config.proxy.clone());
261
262 let fut = async move {
263 let port = dst.port().map(|p| p.as_u16()).unwrap_or(443);
264 let host = dst.host().ok_or(SocksError::MissingHost)?.to_string();
265
266 let conn = connecting.await.map_err(SocksError::Inner)?;
267 config.execute(conn, host, port).await
268 };
269
270 Handshaking {
271 fut: Box::pin(fut),
272 _marker: Default::default(),
273 }
274 }
275}