async_proxies/proxies/socks4/
general.rs

1use crate::proxy::AsyncProxy;
2use async_trait::async_trait;
3
4use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
5use tokio::net::TcpStream;
6use tokio::time::timeout;
7
8use std::net::SocketAddr;
9use std::net::SocketAddrV4;
10use std::fmt::{Formatter, Display, Error};
11use std::time::Duration;
12
13pub struct Socks4General;
14
15pub enum ErrorKind {
16    ConnectionFailed,
17    RawStreamIOFailed(io::Error),
18    GotBadBuffer,
19    RequestDenied,
20    IdentIsUnavailable,
21    BadIdent,
22    OperationTimeoutReached
23}
24
25#[repr(u8)]
26pub enum Command {
27    TcpConnectionEstablishment = 1,
28    TcpPortBinding = 2
29}
30
31pub struct Timeouts {
32    connection_timeout: Duration,
33    write_timeout: Duration,
34    read_timeout: Duration
35}
36
37pub struct ConnParams {
38    // the address of a proxy
39    socks_addr: SocketAddr,
40    // the address of a destination service
41    dest_addr: SocketAddrV4,
42    ident: String,
43    // the way to establish connection (bind a port/connect)
44    command: Command,
45    // just timeouts
46    timeouts: Timeouts,
47}
48
49impl Timeouts {
50    pub fn new(connection_timeout: Duration,
51               write_timeout: Duration,
52               read_timeout: Duration)
53        -> Timeouts
54    {
55        Timeouts { connection_timeout, write_timeout, read_timeout }
56    }
57}
58
59impl Display for ErrorKind {
60    fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
61        match self {
62            ErrorKind::ConnectionFailed => f.write_str("connection failed"),
63            ErrorKind::RawStreamIOFailed(e) 
64                => f.write_str(&format!("I/O error: {}", e)),
65            ErrorKind::GotBadBuffer => f.write_str("bad buffer has been received"),
66            ErrorKind::RequestDenied => f.write_str("request denied"),
67            ErrorKind::IdentIsUnavailable => f.write_str("ident is unavailable"),
68            ErrorKind::BadIdent => f.write_str("bad ident"),
69            ErrorKind::OperationTimeoutReached => f.write_str("operation timeout reached")
70        }
71    }
72}
73
74impl ConnParams {
75    pub fn new(socks_addr: SocketAddr, dest_addr: SocketAddrV4,
76        ident: String, command: Command, timeouts: Timeouts)
77        -> ConnParams
78    {
79        ConnParams { socks_addr, dest_addr, ident, command, timeouts }
80    }
81}
82
83#[async_trait]
84impl AsyncProxy for Socks4General {
85    type OutputStream = TcpStream;
86    type ErrorKind = ErrorKind;
87    type ConnParams = ConnParams;
88
89    async fn connect(&mut self, params: Self::ConnParams)
90        -> Result<Self::OutputStream, Self::ErrorKind>
91    {
92        // Connecting to the proxy itself
93        let future = TcpStream::connect(params.socks_addr);
94        let future = timeout(params.timeouts.connection_timeout, future);
95        // You also can see that this code template
96        // (with creating a future and wrapping it with a timeout)
97        // repeats many times in this code. The repetitions
98        // will be ofc reduced, don't worry.
99        let stream = future.await
100                           .map_err(|_| ErrorKind::OperationTimeoutReached)?
101                           .map_err(|_| ErrorKind::ConnectionFailed)?;
102        
103        // Buffer length in bytes is determined this way:
104        //  (+1) for the number of the version of the socks protocol (4 in this case)
105        //  (+1) for the command number (1 or 2)
106        //  (+2) for port (in the network byte order)
107        //  (+4) for the IPv4 address
108        //  (+n) where `n` is the length of the given ident
109        //  (+1) for the NULL-termination byte (0x00)
110        let buffer_len = 1 + 1 + 2 + 4 + params.ident.len() + 1;
111        let mut buffer = Vec::with_capacity(buffer_len);
112
113        // the version of the socks protocol being used
114        buffer.push(4);
115        // the command number
116        buffer.push(params.command as u8);
117            
118        let port_in_bytes = params.dest_addr.port().to_be_bytes();
119        // the destination port
120        buffer.extend_from_slice(&port_in_bytes[..]);
121
122        let ipaddr_in_bytes = params.dest_addr.ip().octets();
123        // the ipv4 address of the destination host
124        buffer.extend_from_slice(&ipaddr_in_bytes[..]);
125
126        // the given ident
127        buffer.extend_from_slice(&params.ident.as_bytes());
128        // and, finally, the NULL-termination byte
129        buffer.push(0);
130            
131        let future = self.write_buffer(&buffer[..], stream);
132        let future = timeout(params.timeouts.write_timeout, future);
133        let mut stream = future.await
134                               .map_err(|_| ErrorKind::OperationTimeoutReached)??;
135
136        let future = stream.read(&mut buffer[..]);
137        let future = timeout(params.timeouts.read_timeout, future);
138        let read_bytes = future.await
139                               .map_err(|_| ErrorKind::OperationTimeoutReached)?
140                               .map_err(|e| ErrorKind::RawStreamIOFailed(e))?;
141
142        if read_bytes < 2 {
143            return Err(ErrorKind::GotBadBuffer)
144        }
145
146        match buffer[1] {
147            0x5a => Ok(stream),
148            0x5b => Err(ErrorKind::RequestDenied),
149            0x5c => Err(ErrorKind::IdentIsUnavailable),
150            0x5d => Err(ErrorKind::BadIdent),
151            _ => Err(ErrorKind::GotBadBuffer)
152        }
153    }
154
155    async fn write_buffer(&mut self, buffer: &[u8], mut stream: Self::OutputStream)
156        -> Result<Self::OutputStream, Self::ErrorKind>
157    {
158        match stream.write_all(buffer).await {
159            Ok(_) => Ok(stream),
160            Err(e) => Err(ErrorKind::RawStreamIOFailed(e))
161        }
162
163        // also could be written as:
164        //  stream.write_all(buffer)
165        //         .await
166        //         .map(|_| stream)
167        //         .map_err(|e| ErrorKind::RawStreamIOFailed(e))
168    }
169
170    async fn read_buffer(&mut self, buffer: &mut [u8], mut stream: Self::OutputStream)
171        -> Result<Self::OutputStream, Self::ErrorKind>
172    {
173        match stream.read(buffer).await {
174            Ok(_) => Ok(stream),
175            Err(e) => Err(ErrorKind::RawStreamIOFailed(e))
176        }
177    }
178
179    async fn drop_stream(&mut self, _: Self::OutputStream)
180        -> Result<(), Self::ErrorKind>
181    {
182        Ok(())
183    }
184}