rd_std/socks5/
server.rs

1use super::common::{pack_udp, parse_udp, Address};
2use super::protocol::{
3    self, AuthMethod, AuthRequest, AuthResponse, CommandRequest, CommandResponse, Version,
4};
5use protocol::Command;
6use rd_interface::{
7    async_trait,
8    util::{connect_tcp, connect_udp},
9    Context, IServer, IUdpChannel, IntoAddress, IntoDyn, Net, Result, TcpStream, UdpSocket,
10};
11use std::{
12    net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
13    sync::{Arc, RwLock},
14};
15use tokio::io::{split, AsyncWriteExt, BufWriter};
16
17struct Config {
18    net: Net,
19    listen_net: Net,
20}
21
22#[derive(Clone)]
23pub struct Socks5Server {
24    cfg: Arc<Config>,
25}
26
27impl Socks5Server {
28    pub async fn serve_connection(self, socket: TcpStream, addr: SocketAddr) -> anyhow::Result<()> {
29        let default_addr: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
30        let Config { net, listen_net } = &*self.cfg;
31        let local_ip = socket.local_addr().await?.ip();
32        let (mut rx, tx) = split(socket);
33        let mut tx = BufWriter::with_capacity(512, tx);
34
35        let version = Version::read(&mut rx).await?;
36        let auth_req = AuthRequest::read(&mut rx).await?;
37
38        let method = auth_req.select_from(&[AuthMethod::Noauth]);
39        let auth_resp = AuthResponse::new(method);
40        // TODO: do auth here
41
42        version.write(&mut tx).await?;
43        auth_resp.write(&mut tx).await?;
44        tx.flush().await?;
45
46        let cmd_req = CommandRequest::read(&mut rx).await?;
47
48        match cmd_req.command {
49            Command::Connect => {
50                let dst = cmd_req.address.into();
51                let out = match net
52                    .tcp_connect(&mut Context::from_socketaddr(addr), dst)
53                    .await
54                {
55                    Ok(socket) => socket,
56                    Err(e) => {
57                        CommandResponse::error(e).write(&mut tx).await?;
58                        tx.flush().await?;
59                        return Ok(());
60                    }
61                };
62
63                let addr: Address = out.local_addr().await.unwrap_or(default_addr).into();
64                CommandResponse::success(addr).write(&mut tx).await?;
65                tx.flush().await?;
66
67                let socket = rx.unsplit(tx.into_inner());
68
69                connect_tcp(out, socket).await?;
70            }
71            Command::UdpAssociate => {
72                let dst = match cmd_req.address {
73                    Address::SocketAddr(SocketAddr::V4(_)) => rd_interface::Address::SocketAddr(
74                        SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0),
75                    ),
76                    Address::SocketAddr(SocketAddr::V6(_)) => rd_interface::Address::SocketAddr(
77                        SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
78                    ),
79                    _ => {
80                        CommandResponse::reply_error(
81                            protocol::CommandReply::AddressTypeNotSupported,
82                        )
83                        .write(&mut tx)
84                        .await?;
85
86                        tx.flush().await?;
87                        return Ok(());
88                    }
89                };
90                let out = match net
91                    .udp_bind(&mut Context::from_socketaddr(addr), dst.into())
92                    .await
93                {
94                    Ok(socket) => socket,
95                    Err(e) => {
96                        CommandResponse::error(e).write(&mut tx).await?;
97                        tx.flush().await?;
98                        return Ok(());
99                    }
100                };
101                let udp = listen_net
102                    .udp_bind(
103                        &mut Context::from_socketaddr(addr),
104                        "0.0.0.0:0".into_address()?,
105                    )
106                    .await?;
107
108                // success
109                let udp_port = match udp.local_addr().await {
110                    Ok(a) => a.port(),
111                    Err(e) => {
112                        CommandResponse::error(e).write(&mut tx).await?;
113                        tx.flush().await?;
114                        return Ok(());
115                    }
116                };
117                let addr: SocketAddr = (local_ip, udp_port).into();
118                let addr: Address = addr.into();
119
120                CommandResponse::success(addr).write(&mut tx).await?;
121                tx.flush().await?;
122
123                let socket = rx.unsplit(tx.into_inner());
124
125                let udp_channel = Socks5UdpSocket(udp, socket, RwLock::new(None));
126                connect_udp(udp_channel.into_dyn(), out).await?;
127            }
128            _ => {
129                return Ok(());
130            }
131        };
132
133        Ok(())
134    }
135    pub fn new(listen_net: Net, net: Net) -> Self {
136        Self {
137            cfg: Arc::new(Config { net, listen_net }),
138        }
139    }
140}
141
142pub struct Socks5UdpSocket(UdpSocket, TcpStream, RwLock<Option<SocketAddr>>);
143
144#[async_trait]
145impl IUdpChannel for Socks5UdpSocket {
146    async fn recv_send_to(&self, buf: &mut [u8]) -> Result<(usize, rd_interface::Address)> {
147        // 259 is max size of address, atype 1 + domain len 1 + domain 255 + port 2
148        let bytes_size = 259 + buf.len();
149        let mut bytes = vec![0u8; bytes_size];
150        let (recv_len, from_addr) = self.0.recv_from(&mut bytes).await?;
151        let saved_addr = { *self.2.read().unwrap() };
152        if let None = saved_addr {
153            *self.2.write().unwrap() = Some(from_addr);
154        }
155        bytes.truncate(recv_len);
156
157        let (addr, payload) = parse_udp(&bytes).await?;
158        let to_copy = payload.len().min(buf.len());
159        buf[..to_copy].copy_from_slice(&payload[..to_copy]);
160
161        Ok((to_copy, addr.into()))
162    }
163
164    async fn send_recv_from(&self, buf: &[u8], addr: SocketAddr) -> Result<usize> {
165        let saddr: Address = addr.into();
166
167        let bytes = pack_udp(saddr, buf).await?;
168
169        let addr = { *self.2.read().unwrap() };
170        Ok(if let Some(addr) = addr {
171            self.0.send_to(&bytes, addr.into()).await?
172        } else {
173            0
174        })
175    }
176}
177
178pub struct Socks5 {
179    server: Socks5Server,
180    listen_net: Net,
181    bind: String,
182}
183
184#[async_trait]
185impl IServer for Socks5 {
186    async fn start(&self) -> Result<()> {
187        let listener = self
188            .listen_net
189            .tcp_bind(&mut Context::new(), self.bind.into_address()?)
190            .await?;
191
192        loop {
193            let (socket, addr) = listener.accept().await?;
194            let server = self.server.clone();
195            let _ = tokio::spawn(async move {
196                if let Err(e) = server.serve_connection(socket, addr).await {
197                    log::error!("Error when serve_connection: {:?}", e)
198                }
199            });
200        }
201    }
202}
203
204impl Socks5 {
205    pub fn new(listen_net: Net, net: Net, bind: String) -> Self {
206        Socks5 {
207            server: Socks5Server::new(listen_net.clone(), net),
208            listen_net,
209            bind,
210        }
211    }
212}