async_proxy/clients/socks4/
general.rs1use crate::general::ConnectionTimeouts;
2use crate::proxy::ProxyConstructor;
3use crate::clients::socks4::{ErrorKind, Command};
4use byteorder::{ByteOrder, BigEndian};
5use tokio::net::TcpStream;
6use tokio::io::{AsyncRead, AsyncWrite};
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::time::timeout;
9use std::pin::Pin;
10use core::task::{Poll, Context};
11use std::net::SocketAddrV4;
12use std::str::FromStr;
13use std::borrow::Cow;
14use std::io;
15
16pub struct Socks4General {
20 dest_addr: SocketAddrV4,
23 ident: Cow<'static, str>,
26 timeouts: ConnectionTimeouts
28}
29
30#[derive(Debug)]
34pub enum StrParsingError {
35 SyntaxError,
39 InvalidAddr,
42 InvalidTimeouts,
45}
46
47pub struct S4GeneralStream {
51 wrapped_stream: TcpStream
54}
55
56impl Socks4General {
57 pub fn new(dest_addr: SocketAddrV4, ident: Cow<'static, str>,
58 timeouts: ConnectionTimeouts)
59 -> Socks4General
60 {
61 Socks4General { dest_addr, ident, timeouts }
62 }
63}
64
65impl FromStr for Socks4General {
68 type Err = StrParsingError;
69
70 fn from_str(s: &str) -> Result<Socks4General, Self::Err> {
74 let mut s = s.split(" ");
76
77 let (address, ident, timeouts) = (s.next()
79 .ok_or(StrParsingError::SyntaxError)?
80 .parse::<SocketAddrV4>()
81 .map_err(|_| StrParsingError::InvalidAddr)?,
82 s.next()
83 .ok_or(StrParsingError::SyntaxError)?,
84 s.next()
85 .ok_or(StrParsingError::SyntaxError)?
86 .parse::<ConnectionTimeouts>()
87 .map_err(|_| StrParsingError::InvalidTimeouts)?);
88
89 Ok(Socks4General::new(address, Cow::Owned(ident.to_owned()), timeouts))
90 }
91}
92
93#[async_trait::async_trait]
94impl ProxyConstructor for Socks4General {
95 type ProxyStream = S4GeneralStream;
96 type Stream = TcpStream;
97 type ErrorKind = ErrorKind;
98
99 async fn connect(&mut self, mut stream: Self::Stream)
100 -> Result<Self::ProxyStream, Self::ErrorKind>
101 {
102 let buf_len = 1 + 1 + 2 + 4 + self.ident.len() + 1;
111 let mut buf = Vec::with_capacity(buf_len);
113
114 buf.push(4);
117
118 buf.push(Command::TcpConnectionEstablishment as u8);
120
121 buf.push(0);
125 buf.push(0);
126
127 BigEndian::write_u16(&mut buf[2..4], self.dest_addr.port());
129
130 buf.push(0);
134 buf.push(0);
135 buf.push(0);
136 buf.push(0);
137
138 BigEndian::write_u32(&mut buf[4..8], (*self.dest_addr.ip()).into());
140
141 buf.push(0);
144
145 let future = stream.write_all(&buf);
148 let future = timeout(self.timeouts.write_timeout, future);
149 let _ = future.await.map_err(|_| ErrorKind::OperationTimeoutReached)?
150 .map_err(|e| ErrorKind::IOError(e))?;
151
152 let future = stream.read(&mut buf);
154 let future = timeout(self.timeouts.read_timeout, future);
155 let read_bytes = future.await.map_err(|_| ErrorKind::OperationTimeoutReached)?
156 .map_err(|e| ErrorKind::IOError(e))?;
157
158 if read_bytes != 8 {
162 return Err(ErrorKind::BadBuffer)
163 }
164
165 match buf[1] {
169 0x5a => Ok(S4GeneralStream { wrapped_stream: stream }),
171 0x5b => Err(ErrorKind::RequestDenied),
173 0x5c => Err(ErrorKind::IdentIsUnavailable),
175 0x5d => Err(ErrorKind::BadIdent),
177 _ => Err(ErrorKind::BadBuffer)
180 }
181 }
182}
183
184impl AsyncRead for S4GeneralStream {
185 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
186 -> Poll<io::Result<usize>>
187 {
188 let pinned = &mut Pin::into_inner(self).wrapped_stream;
189 Pin::new(pinned).poll_read(cx, buf)
190 }
191}
192
193impl AsyncWrite for S4GeneralStream {
194 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
195 -> Poll<Result<usize, io::Error>>
196 {
197 let stream = &mut Pin::into_inner(self).wrapped_stream;
198 Pin::new(stream).poll_write(cx, buf)
199 }
200
201 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>)
202 -> Poll<Result<(), io::Error>>
203 {
204 let stream = &mut Pin::into_inner(self).wrapped_stream;
205 Pin::new(stream).poll_flush(cx)
206 }
207
208 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>)
209 -> Poll<Result<(), io::Error>>
210 {
211 let stream = &mut Pin::into_inner(self).wrapped_stream;
212 Pin::new(stream).poll_shutdown(cx)
213 }
214}
215
216impl Into<TcpStream> for S4GeneralStream {
217 fn into(self) -> TcpStream {
218 self.wrapped_stream
219 }
220}